diff --git a/scripts/remove-duplicates.ts b/scripts/remove-duplicates.ts index 2f387fd..7f2d3d0 100644 --- a/scripts/remove-duplicates.ts +++ b/scripts/remove-duplicates.ts @@ -128,50 +128,97 @@ async function findDuplicatesForField( // Use aggregation to find duplicate values const fieldToAggregate = field === 'plaintext' ? 'plaintext.keyword' : field; - const response = await client.search({ - index: INDEX_NAME, - size: 0, - aggs: { - duplicates: { - terms: { - field: fieldToAggregate, - min_doc_count: 2, - size: batchSize + // Use composite aggregation to handle large number of duplicates + let afterKey: any = undefined; + let hasMore = true; + + console.log(` Scanning for duplicates...`); + + while (hasMore) { + const aggQuery: any = { + index: INDEX_NAME, + size: 0, + aggs: { + duplicates: { + composite: { + size: batchSize, + sources: [ + { value: { terms: { field: fieldToAggregate } } } + ], + ...(afterKey && { after: afterKey }) + }, + aggs: { + doc_count_filter: { + bucket_selector: { + buckets_path: { count: '_count' }, + script: 'params.count > 1' + } + } + } + } + } + }; + + const response = await client.search(aggQuery); + const compositeAgg = response.aggregations?.duplicates as any; + const buckets = compositeAgg?.buckets || []; + + for (const bucket of buckets) { + if (bucket.doc_count > 1) { + const value = bucket.key.value; + + // Use scroll API for large result sets + const documentIds: string[] = []; + + let scrollResponse = await client.search({ + index: INDEX_NAME, + scroll: '1m', + size: 1000, + query: { + term: { + [fieldToAggregate]: value + } + }, + sort: [ + { created_at: { order: 'asc' } } + ], + _source: false + }); + + while (scrollResponse.hits.hits.length > 0) { + documentIds.push(...scrollResponse.hits.hits.map((hit: any) => hit._id)); + + if (!scrollResponse._scroll_id) break; + + scrollResponse = await client.scroll({ + scroll_id: scrollResponse._scroll_id, + scroll: '1m' + }); + } + + // Clear scroll + if (scrollResponse._scroll_id) { + await client.clearScroll({ scroll_id: scrollResponse._scroll_id }).catch(() => {}); + } + + if (documentIds.length > 1) { + duplicates.push({ + value: String(value), + field, + documentIds, + keepId: documentIds[0], // Keep the oldest + deleteIds: documentIds.slice(1) // Delete the rest + }); } } } - }); - const buckets = (response.aggregations?.duplicates as any)?.buckets || []; - - for (const bucket of buckets) { - const value = bucket.key; + // Check if there are more results + afterKey = compositeAgg?.after_key; + hasMore = buckets.length === batchSize && afterKey; - // Get all documents with this value, sorted by created_at - const docsResponse = await client.search({ - index: INDEX_NAME, - size: bucket.doc_count, - query: { - term: { - [fieldToAggregate]: value - } - }, - sort: [ - { created_at: { order: 'asc' } } - ], - _source: ['plaintext', 'md5', 'sha1', 'sha256', 'sha512', 'created_at'] - }); - - const documentIds = docsResponse.hits.hits.map((hit: any) => hit._id); - - if (documentIds.length > 1) { - duplicates.push({ - value: String(value), - field, - documentIds, - keepId: documentIds[0], // Keep the oldest - deleteIds: documentIds.slice(1) // Delete the rest - }); + if (hasMore) { + process.stdout.write(`\r Found ${duplicates.length} duplicate groups so far...`); } }