diff --git a/lib/taskdeletedup.js b/lib/taskdeletedup.js index 6f3cf92..9162f4d 100644 --- a/lib/taskdeletedup.js +++ b/lib/taskdeletedup.js @@ -1,50 +1,67 @@ module.exports = client => { const constant = require('./constant'), schedule = require('node-schedule'), + { pick } = require('stream-json/filters/Pick'), + { parser } = require('stream-json'), + { streamArray } = require('stream-json/streamers/StreamArray'), + { chain } = require('stream-chain'), size = 500, deleteDup = async () => { - const count = { deleted: 0, total: 0 } - let lastsort = undefined, result = undefined, last = undefined - do { - result = await client.search({ - index: constant.index, - size: size, - body: { - query: { - match_all: {} + const count = { deleted: 0, total: 0, current: 0 } + let lastsort = undefined, last = undefined + await searchDup(count, lastsort, last) + }, + searchDup = async (count, lastsort, last) => { + count.current = 0 + const result = await client.search({ + index: constant.index, + size: size, + body: { + query: { + match_all: {} + }, + sort: [{ + "instance": { + "order": "asc" }, - sort: [{ - "instance": { - "order": "asc" - }, - "last": { - "order": "desc", - "numeric_type": "date_nanos", - "format": "strict_date_optional_time_nanos" - } - }], - search_after: lastsort - } - }) - for (const instance of result.hits.hits) { - if (last && instance._source && last.instance === instance._source.instance) { - await client.delete({ index: constant.index, id: instance._id }) - count.deleted++ - console.log('deleted ' + instance._id + ': ' + instance._source.instance) - } - else { - last = instance._source - } - if (instance._id === result.hits.hits[result.hits.hits.length - 1]._id) { - lastsort = instance.sort - } + "last": { + "order": "desc", + "numeric_type": "date_nanos", + "format": "strict_date_optional_time_nanos" + } + }], + search_after: lastsort } - count.total += result.hits.hits.length - if (result.hits.hits.length !== size) { - break + }, { asStream: true, meta: false }), + pipeline = chain([ + parser(), + pick({ filter: 'hits.hits' }), + streamArray(), + data => data.value + ]) + pipeline.on('data', async data => { + count.current++ + if (last && last.instance === data._source.instance) { + await client.delete({ index: constant.index, id: data._id }) + count.deleted++ + console.log('deleted ' + data._id + ': ' + data._source.instance) } - } while (result.hits && result.hits.hits && result.hits.hits.length > 0) - console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted) + else { + last = data._source + } + if (count.current === size) { + lastsort = data.sort + } + }) + pipeline.on('end', async () => { + count.total += size + if (count.current === size) { + await searchDup(count, lastsort, last) + } else { + console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted) + } + }) + result.pipe(pipeline) }, job = schedule.scheduleJob('0 ' + constant.taskdeletedup + ' * * *', async () => { await deleteDup()