taskdeletedup with streams
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
ale 2024-10-19 20:37:54 +02:00
parent b8f927be2b
commit 339ab9c192

View File

@ -1,12 +1,19 @@
module.exports = client => { module.exports = client => {
const constant = require('./constant'), const constant = require('./constant'),
schedule = require('node-schedule'), schedule = require('node-schedule'),
{ pick } = require('stream-json/filters/Pick'),
{ parser } = require('stream-json'),
{ streamArray } = require('stream-json/streamers/StreamArray'),
{ chain } = require('stream-chain'),
size = 500, size = 500,
deleteDup = async () => { deleteDup = async () => {
const count = { deleted: 0, total: 0 } const count = { deleted: 0, total: 0, current: 0 }
let lastsort = undefined, result = undefined, last = undefined let lastsort = undefined, last = undefined
do { await searchDup(count, lastsort, last)
result = await client.search({ },
searchDup = async (count, lastsort, last) => {
count.current = 0
const result = await client.search({
index: constant.index, index: constant.index,
size: size, size: size,
body: { body: {
@ -25,26 +32,36 @@ module.exports = client => {
}], }],
search_after: lastsort search_after: lastsort
} }
}) }, { asStream: true, meta: false }),
for (const instance of result.hits.hits) { pipeline = chain([
if (last && instance._source && last.instance === instance._source.instance) { parser(),
await client.delete({ index: constant.index, id: instance._id }) pick({ filter: 'hits.hits' }),
streamArray(),
data => data.value
])
pipeline.on('data', async data => {
count.current++
if (last && last.instance === data._source.instance) {
await client.delete({ index: constant.index, id: data._id })
count.deleted++ count.deleted++
console.log('deleted ' + instance._id + ': ' + instance._source.instance) console.log('deleted ' + data._id + ': ' + data._source.instance)
} }
else { else {
last = instance._source last = data._source
} }
if (instance._id === result.hits.hits[result.hits.hits.length - 1]._id) { if (count.current === size) {
lastsort = instance.sort lastsort = data.sort
} }
} })
count.total += result.hits.hits.length pipeline.on('end', async () => {
if (result.hits.hits.length !== size) { count.total += size
break if (count.current === size) {
} await searchDup(count, lastsort, last)
} while (result.hits && result.hits.hits && result.hits.hits.length > 0) } else {
console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted) console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted)
}
})
result.pipe(pipeline)
}, },
job = schedule.scheduleJob('0 ' + constant.taskdeletedup + ' * * *', async () => { job = schedule.scheduleJob('0 ' + constant.taskdeletedup + ' * * *', async () => {
await deleteDup() await deleteDup()