@@ -128,50 +128,97 @@ async function findDuplicatesForField(
|
||||
// Use aggregation to find duplicate values
|
||||
const fieldToAggregate = field === 'plaintext' ? 'plaintext.keyword' : field;
|
||||
|
||||
const response = await client.search({
|
||||
index: INDEX_NAME,
|
||||
size: 0,
|
||||
aggs: {
|
||||
duplicates: {
|
||||
terms: {
|
||||
field: fieldToAggregate,
|
||||
min_doc_count: 2,
|
||||
size: batchSize
|
||||
// Use composite aggregation to handle large number of duplicates
|
||||
let afterKey: any = undefined;
|
||||
let hasMore = true;
|
||||
|
||||
console.log(` Scanning for duplicates...`);
|
||||
|
||||
while (hasMore) {
|
||||
const aggQuery: any = {
|
||||
index: INDEX_NAME,
|
||||
size: 0,
|
||||
aggs: {
|
||||
duplicates: {
|
||||
composite: {
|
||||
size: batchSize,
|
||||
sources: [
|
||||
{ value: { terms: { field: fieldToAggregate } } }
|
||||
],
|
||||
...(afterKey && { after: afterKey })
|
||||
},
|
||||
aggs: {
|
||||
doc_count_filter: {
|
||||
bucket_selector: {
|
||||
buckets_path: { count: '_count' },
|
||||
script: 'params.count > 1'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const response = await client.search(aggQuery);
|
||||
const compositeAgg = response.aggregations?.duplicates as any;
|
||||
const buckets = compositeAgg?.buckets || [];
|
||||
|
||||
for (const bucket of buckets) {
|
||||
if (bucket.doc_count > 1) {
|
||||
const value = bucket.key.value;
|
||||
|
||||
// Use scroll API for large result sets
|
||||
const documentIds: string[] = [];
|
||||
|
||||
let scrollResponse = await client.search({
|
||||
index: INDEX_NAME,
|
||||
scroll: '1m',
|
||||
size: 1000,
|
||||
query: {
|
||||
term: {
|
||||
[fieldToAggregate]: value
|
||||
}
|
||||
},
|
||||
sort: [
|
||||
{ created_at: { order: 'asc' } }
|
||||
],
|
||||
_source: false
|
||||
});
|
||||
|
||||
while (scrollResponse.hits.hits.length > 0) {
|
||||
documentIds.push(...scrollResponse.hits.hits.map((hit: any) => hit._id));
|
||||
|
||||
if (!scrollResponse._scroll_id) break;
|
||||
|
||||
scrollResponse = await client.scroll({
|
||||
scroll_id: scrollResponse._scroll_id,
|
||||
scroll: '1m'
|
||||
});
|
||||
}
|
||||
|
||||
// Clear scroll
|
||||
if (scrollResponse._scroll_id) {
|
||||
await client.clearScroll({ scroll_id: scrollResponse._scroll_id }).catch(() => {});
|
||||
}
|
||||
|
||||
if (documentIds.length > 1) {
|
||||
duplicates.push({
|
||||
value: String(value),
|
||||
field,
|
||||
documentIds,
|
||||
keepId: documentIds[0], // Keep the oldest
|
||||
deleteIds: documentIds.slice(1) // Delete the rest
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const buckets = (response.aggregations?.duplicates as any)?.buckets || [];
|
||||
// Check if there are more results
|
||||
afterKey = compositeAgg?.after_key;
|
||||
hasMore = buckets.length === batchSize && afterKey;
|
||||
|
||||
for (const bucket of buckets) {
|
||||
const value = bucket.key;
|
||||
|
||||
// Get all documents with this value, sorted by created_at
|
||||
const docsResponse = await client.search({
|
||||
index: INDEX_NAME,
|
||||
size: bucket.doc_count,
|
||||
query: {
|
||||
term: {
|
||||
[fieldToAggregate]: value
|
||||
}
|
||||
},
|
||||
sort: [
|
||||
{ created_at: { order: 'asc' } }
|
||||
],
|
||||
_source: ['plaintext', 'md5', 'sha1', 'sha256', 'sha512', 'created_at']
|
||||
});
|
||||
|
||||
const documentIds = docsResponse.hits.hits.map((hit: any) => hit._id);
|
||||
|
||||
if (documentIds.length > 1) {
|
||||
duplicates.push({
|
||||
value: String(value),
|
||||
field,
|
||||
documentIds,
|
||||
keepId: documentIds[0], // Keep the oldest
|
||||
deleteIds: documentIds.slice(1) // Delete the rest
|
||||
});
|
||||
if (hasMore) {
|
||||
process.stdout.write(`\r Found ${duplicates.length} duplicate groups so far...`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Referencia en una nueva incidencia
Block a user