fediblock-instance/lib/taskdeletedup.js
ale 67f43482fe
All checks were successful
continuous-integration/drone/push Build is passing
count.current
2024-10-20 07:36:05 +02:00

70 lines
2.7 KiB
JavaScript

module.exports = client => {
const constant = require('./constant'),
schedule = require('node-schedule'),
{ pick } = require('stream-json/filters/Pick'),
{ parser } = require('stream-json'),
{ streamArray } = require('stream-json/streamers/StreamArray'),
{ chain } = require('stream-chain'),
size = 500,
deleteDup = async () => {
const count = { deleted: 0, total: 0, current: 0 }
let lastsort = undefined, last = undefined
await searchDup(count, lastsort, last)
},
searchDup = async (count, lastsort, last) => {
count.current = 0
const result = await client.search({
index: constant.index,
size: size,
body: {
query: {
match_all: {}
},
sort: [{
"instance": {
"order": "asc"
},
"last": {
"order": "desc",
"numeric_type": "date_nanos",
"format": "strict_date_optional_time_nanos"
}
}],
search_after: lastsort
}
}, { asStream: true, meta: false }),
pipeline = chain([
parser(),
pick({ filter: 'hits.hits' }),
streamArray(),
data => data.value
])
pipeline.on('data', async data => {
count.current++
if (last && last.instance === data._source.instance) {
await client.delete({ index: constant.index, id: data._id })
count.deleted++
console.log('deleted ' + data._id + ': ' + data._source.instance)
}
else {
last = data._source
}
if (count.current === size) {
lastsort = data.sort
}
})
pipeline.on('end', async () => {
count.total += count.current
if (count.current === size) {
await searchDup(count, lastsort, last)
} else {
console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted)
}
})
result.pipe(pipeline)
},
job = schedule.scheduleJob('0 ' + constant.taskdeletedup + ' * * *', async () => {
await deleteDup()
})
}