#!/usr/bin/env node /** * Hasher Duplicate Remover Script * * This script finds and removes duplicate entries from the Elasticsearch index. * It identifies duplicates by checking plaintext, md5, sha1, sha256, and sha512 fields. * * Usage: * npx tsx scripts/remove-duplicates.ts [options] * npm run remove-duplicates [-- options] * * Options: * --dry-run Show duplicates without removing them (default) * --execute Actually remove the duplicates * --batch-size= Number of items to process in each batch (default: 1000) * --field= Check duplicates only on this field (plaintext, md5, sha1, sha256, sha512) * --help, -h Show this help message */ import { Client } from '@elastic/elasticsearch'; const ELASTICSEARCH_NODE = process.env.ELASTICSEARCH_NODE || 'http://localhost:9200'; const INDEX_NAME = 'hasher'; const DEFAULT_BATCH_SIZE = 1000; interface ParsedArgs { dryRun: boolean; batchSize: number; field: string | null; showHelp: boolean; } interface DuplicateGroup { value: string; field: string; documentIds: string[]; keepId: string; deleteIds: string[]; } function parseArgs(args: string[]): ParsedArgs { const result: ParsedArgs = { dryRun: true, batchSize: DEFAULT_BATCH_SIZE, field: null, showHelp: false }; for (let i = 0; i < args.length; i++) { const arg = args[i]; if (arg === '--help' || arg === '-h') { result.showHelp = true; } else if (arg === '--dry-run') { result.dryRun = true; } else if (arg === '--execute') { result.dryRun = false; } else if (arg.startsWith('--batch-size=')) { const value = arg.split('=')[1]; const parsed = parseInt(value, 10); if (!isNaN(parsed) && parsed > 0) { result.batchSize = parsed; } } else if (arg === '--batch-size') { const nextArg = args[i + 1]; if (nextArg && !nextArg.startsWith('-')) { const parsed = parseInt(nextArg, 10); if (!isNaN(parsed) && parsed > 0) { result.batchSize = parsed; i++; } } } else if (arg.startsWith('--field=')) { result.field = arg.split('=')[1]; } else if (arg === '--field') { const nextArg = args[i + 1]; if (nextArg && !nextArg.startsWith('-')) { result.field = nextArg; i++; } } } return result; } function showHelp() { console.log(` Hasher Duplicate Remover Script Usage: npx tsx scripts/remove-duplicates.ts [options] npm run remove-duplicates [-- options] Options: --dry-run Show duplicates without removing them (default) --execute Actually remove the duplicates --batch-size= Number of items to process in each batch (default: 1000) --field= Check duplicates only on this field Valid fields: plaintext, md5, sha1, sha256, sha512 --help, -h Show this help message Environment Variables: ELASTICSEARCH_NODE Elasticsearch node URL (default: http://localhost:9200) Examples: npx tsx scripts/remove-duplicates.ts # Dry run, show all duplicates npx tsx scripts/remove-duplicates.ts --execute # Remove all duplicates npx tsx scripts/remove-duplicates.ts --field=md5 # Check only md5 duplicates npx tsx scripts/remove-duplicates.ts --execute --field=plaintext Notes: - The script keeps the OLDEST document (by created_at) and removes newer duplicates - Always run with --dry-run first to review what will be deleted - Duplicates are checked across all hash fields by default `); process.exit(0); } async function findDuplicatesForField( client: Client, field: string, batchSize: number ): Promise { const duplicates: DuplicateGroup[] = []; // Use aggregation to find duplicate values const fieldToAggregate = field === 'plaintext' ? 'plaintext.keyword' : field; // Use composite aggregation to handle large number of duplicates let afterKey: any = undefined; let hasMore = true; console.log(` Scanning for duplicates...`); while (hasMore) { const aggQuery: any = { index: INDEX_NAME, size: 0, aggs: { duplicates: { composite: { size: batchSize, sources: [ { value: { terms: { field: fieldToAggregate } } } ], ...(afterKey && { after: afterKey }) }, aggs: { doc_count_filter: { bucket_selector: { buckets_path: { count: '_count' }, script: 'params.count > 1' } } } } } }; const response = await client.search(aggQuery); const compositeAgg = response.aggregations?.duplicates as any; const buckets = compositeAgg?.buckets || []; for (const bucket of buckets) { if (bucket.doc_count > 1) { const value = bucket.key.value; // Use scroll API for large result sets const documentIds: string[] = []; let scrollResponse = await client.search({ index: INDEX_NAME, scroll: '1m', size: 1000, query: { term: { [fieldToAggregate]: value } }, sort: [ { created_at: { order: 'asc' } } ], _source: false }); while (scrollResponse.hits.hits.length > 0) { documentIds.push(...scrollResponse.hits.hits.map((hit: any) => hit._id)); if (!scrollResponse._scroll_id) break; scrollResponse = await client.scroll({ scroll_id: scrollResponse._scroll_id, scroll: '1m' }); } // Clear scroll if (scrollResponse._scroll_id) { await client.clearScroll({ scroll_id: scrollResponse._scroll_id }).catch(() => {}); } if (documentIds.length > 1) { duplicates.push({ value: String(value), field, documentIds, keepId: documentIds[0], // Keep the oldest deleteIds: documentIds.slice(1) // Delete the rest }); } } } // Check if there are more results afterKey = compositeAgg?.after_key; hasMore = buckets.length === batchSize && afterKey; if (hasMore) { process.stdout.write(`\r Found ${duplicates.length} duplicate groups so far...`); } } return duplicates; } /** * Phase 1: Initialize and connect to Elasticsearch */ async function phase1_InitAndConnect() { console.log(`🔍 Hasher Duplicate Remover - Phase 1: Initialization`); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); console.log(`Elasticsearch: ${ELASTICSEARCH_NODE}`); console.log(`Index: ${INDEX_NAME}`); console.log(''); const client = new Client({ node: ELASTICSEARCH_NODE }); console.log('🔗 Connecting to Elasticsearch...'); await client.cluster.health({}); console.log('✅ Connected successfully\n'); const countResponse = await client.count({ index: INDEX_NAME }); console.log(`📊 Total documents in index: ${countResponse.count}\n`); return { client, totalDocuments: countResponse.count }; } /** * Phase 2: Find duplicates for a specific field */ async function phase2_FindDuplicatesForField( client: Client, field: string, batchSize: number, seenDeleteIds: Set ): Promise<{ duplicates: DuplicateGroup[], totalFound: number }> { console.log(`\n🔍 Phase 2: Checking duplicates for field: ${field}...`); const fieldDuplicates = await findDuplicatesForField(client, field, batchSize); const duplicates: DuplicateGroup[] = []; // Filter out already seen delete IDs to avoid counting the same document multiple times for (const dup of fieldDuplicates) { const newDeleteIds = dup.deleteIds.filter(id => !seenDeleteIds.has(id)); if (newDeleteIds.length > 0) { dup.deleteIds = newDeleteIds; newDeleteIds.forEach(id => seenDeleteIds.add(id)); duplicates.push(dup); } } console.log(` Found ${fieldDuplicates.length} duplicate groups for ${field}`); console.log(` New unique documents to delete: ${duplicates.reduce((sum, dup) => sum + dup.deleteIds.length, 0)}`); // Force garbage collection if available if (global.gc) { global.gc(); console.log(` ♻️ Memory freed after processing ${field}`); } return { duplicates, totalFound: fieldDuplicates.length }; } /** * Phase 3: Process deletion for a batch of duplicates */ async function phase3_DeleteBatch( client: Client, deleteIds: string[], batchSize: number, startIndex: number ): Promise<{ deleted: number, errors: number }> { const batch = deleteIds.slice(startIndex, startIndex + batchSize); let deleted = 0; let errors = 0; try { const bulkOperations = batch.flatMap(id => [ { delete: { _index: INDEX_NAME, _id: id } } ]); const bulkResponse = await client.bulk({ operations: bulkOperations, refresh: false }); if (bulkResponse.errors) { const errorCount = bulkResponse.items.filter((item: any) => item.delete?.error).length; errors += errorCount; deleted += batch.length - errorCount; } else { deleted += batch.length; } } catch (error) { console.error(`\n❌ Error deleting batch:`, error); errors += batch.length; } // Force garbage collection if available if (global.gc) { global.gc(); } return { deleted, errors }; } /** * Phase 4: Finalize and report results */ async function phase4_Finalize( client: Client, totalDeleted: number, totalErrors: number, initialDocumentCount: number ) { console.log('\n\n🔄 Phase 4: Refreshing index...'); await client.indices.refresh({ index: INDEX_NAME }); const newCountResponse = await client.count({ index: INDEX_NAME }); console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'); console.log('✅ Duplicate removal complete!'); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); console.log(`Documents deleted: ${totalDeleted}`); console.log(`Errors: ${totalErrors}`); console.log(`Previous document count: ${initialDocumentCount}`); console.log(`New document count: ${newCountResponse.count}`); console.log(''); } async function removeDuplicates(parsedArgs: ParsedArgs) { const fields = parsedArgs.field ? [parsedArgs.field] : ['plaintext', 'md5', 'sha1', 'sha256', 'sha512']; console.log(`Mode: ${parsedArgs.dryRun ? '🔎 DRY RUN (no changes)' : '⚠️ EXECUTE (will delete)'}`); console.log(`Batch size: ${parsedArgs.batchSize}`); console.log(`Fields to check: ${fields.join(', ')}`); console.log(''); try { // === PHASE 1: Initialize === const { client, totalDocuments } = await phase1_InitAndConnect(); // Force garbage collection after phase 1 if (global.gc) { global.gc(); console.log('♻️ Memory freed after initialization\n'); } // === PHASE 2: Find duplicates field by field === const allDuplicates: DuplicateGroup[] = []; const seenDeleteIds = new Set(); for (const field of fields) { const { duplicates } = await phase2_FindDuplicatesForField( client, field, parsedArgs.batchSize, seenDeleteIds ); allDuplicates.push(...duplicates); // Clear field duplicates to free memory duplicates.length = 0; } const totalToDelete = allDuplicates.reduce((sum, dup) => sum + dup.deleteIds.length, 0); console.log(`\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); console.log(`📋 Summary:`); console.log(` Duplicate groups found: ${allDuplicates.length}`); console.log(` Documents to delete: ${totalToDelete}`); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n`); if (allDuplicates.length === 0) { console.log('✨ No duplicates found! Index is clean.\n'); return; } // Show sample of duplicates console.log(`📝 Sample duplicates (showing first 10):\n`); const samplesToShow = allDuplicates.slice(0, 10); for (const dup of samplesToShow) { const truncatedValue = dup.value.length > 50 ? dup.value.substring(0, 50) + '...' : dup.value; console.log(` Field: ${dup.field}`); console.log(` Value: ${truncatedValue}`); console.log(` Keep: ${dup.keepId}`); console.log(` Delete: ${dup.deleteIds.length} document(s)`); console.log(''); } if (allDuplicates.length > 10) { console.log(` ... and ${allDuplicates.length - 10} more duplicate groups\n`); } if (parsedArgs.dryRun) { console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); console.log(`🔎 DRY RUN - No changes made`); console.log(` Run with --execute to remove ${totalToDelete} duplicate documents`); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n`); return; } // === PHASE 3: Execute deletion in batches === console.log(`\n🗑️ Phase 3: Removing ${totalToDelete} duplicate documents...\n`); let totalDeleted = 0; let totalErrors = 0; const deleteIds = allDuplicates.flatMap(dup => dup.deleteIds); // Clear allDuplicates to free memory allDuplicates.length = 0; // Delete in batches with memory management for (let i = 0; i < deleteIds.length; i += parsedArgs.batchSize) { const { deleted, errors } = await phase3_DeleteBatch( client, deleteIds, parsedArgs.batchSize, i ); totalDeleted += deleted; totalErrors += errors; process.stdout.write( `\r⏳ Progress: ${Math.min(i + parsedArgs.batchSize, deleteIds.length)}/${deleteIds.length} - ` + `Deleted: ${totalDeleted}, Errors: ${totalErrors}` ); } // Clear deleteIds to free memory deleteIds.length = 0; seenDeleteIds.clear(); // === PHASE 4: Finalize === await phase4_Finalize(client, totalDeleted, totalErrors, totalDocuments); } catch (error) { console.error('\n❌ Error:', error instanceof Error ? error.message : error); process.exit(1); } } // Parse command line arguments const args = process.argv.slice(2); const parsedArgs = parseArgs(args); if (parsedArgs.showHelp) { showHelp(); } // Validate field if provided const validFields = ['plaintext', 'md5', 'sha1', 'sha256', 'sha512']; if (parsedArgs.field && !validFields.includes(parsedArgs.field)) { console.error(`❌ Invalid field: ${parsedArgs.field}`); console.error(` Valid fields: ${validFields.join(', ')}`); process.exit(1); } console.log(`\n🔧 Configuration:`); console.log(` Mode: ${parsedArgs.dryRun ? 'dry-run' : 'execute'}`); console.log(` Batch size: ${parsedArgs.batchSize}`); if (parsedArgs.field) { console.log(` Field: ${parsedArgs.field}`); } else { console.log(` Fields: all (plaintext, md5, sha1, sha256, sha512)`); } console.log(''); removeDuplicates(parsedArgs).catch(console.error);