module.exports = client => { const constant = require('./constant'), schedule = require('node-schedule'), { pick } = require('stream-json/filters/Pick'), { parser } = require('stream-json'), { streamArray } = require('stream-json/streamers/StreamArray'), { chain } = require('stream-chain'), size = 500, deleteDup = async () => { const count = { deleted: 0, total: 0, current: 0 } let lastsort = undefined, last = undefined await searchDup(count, lastsort, last) }, searchDup = async (count, lastsort, last) => { count.current = 0 const result = await client.search({ index: constant.index, size: size, body: { query: { match_all: {} }, sort: [{ "instance": { "order": "asc" }, "last": { "order": "desc", "numeric_type": "date_nanos", "format": "strict_date_optional_time_nanos" } }], search_after: lastsort } }, { asStream: true, meta: false }), pipeline = chain([ parser(), pick({ filter: 'hits.hits' }), streamArray(), data => data.value ]) pipeline.on('data', async data => { count.current++ if (last && last.instance === data._source.instance) { await client.delete({ index: constant.index, id: data._id }) count.deleted++ console.log('deleted ' + data._id + ': ' + data._source.instance) } else { last = data._source } if (count.current === size) { lastsort = data.sort } }) pipeline.on('end', async () => { count.total += count.current if (count.current === size) { await searchDup(count, lastsort, last) } else { console.log('Index: ' + constant.index + ' - Total: ' + count.total + ' - Deleted: ' + count.deleted) } }) result.pipe(pipeline) }, job = schedule.scheduleJob('0 ' + constant.taskdeletedup + ' * * *', async () => { await deleteDup() }) }