From b91d19dc0b30925674e2e0fbb7599df9cf778f5c Mon Sep 17 00:00:00 2001 From: ale Date: Sun, 21 Dec 2025 22:36:31 +0100 Subject: [PATCH] fix memory remove dup Signed-off-by: ale --- scripts/remove-duplicates.ts | 237 +++++++++++++++++++++++++---------- 1 file changed, 168 insertions(+), 69 deletions(-) diff --git a/scripts/remove-duplicates.ts b/scripts/remove-duplicates.ts index 7f2d3d0..aaa3d7a 100644 --- a/scripts/remove-duplicates.ts +++ b/scripts/remove-duplicates.ts @@ -225,50 +225,166 @@ async function findDuplicatesForField( return duplicates; } -async function removeDuplicates(parsedArgs: ParsedArgs) { +/** + * Phase 1: Initialize and connect to Elasticsearch + */ +async function phase1_InitAndConnect() { + console.log(`🔍 Hasher Duplicate Remover - Phase 1: Initialization`); + console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); + console.log(`Elasticsearch: ${ELASTICSEARCH_NODE}`); + console.log(`Index: ${INDEX_NAME}`); + console.log(''); + const client = new Client({ node: ELASTICSEARCH_NODE }); + + console.log('🔗 Connecting to Elasticsearch...'); + await client.cluster.health({}); + console.log('✅ Connected successfully\n'); + + const countResponse = await client.count({ index: INDEX_NAME }); + console.log(`📊 Total documents in index: ${countResponse.count}\n`); + + return { client, totalDocuments: countResponse.count }; +} + +/** + * Phase 2: Find duplicates for a specific field + */ +async function phase2_FindDuplicatesForField( + client: Client, + field: string, + batchSize: number, + seenDeleteIds: Set +): Promise<{ duplicates: DuplicateGroup[], totalFound: number }> { + console.log(`\n🔍 Phase 2: Checking duplicates for field: ${field}...`); + + const fieldDuplicates = await findDuplicatesForField(client, field, batchSize); + const duplicates: DuplicateGroup[] = []; + + // Filter out already seen delete IDs to avoid counting the same document multiple times + for (const dup of fieldDuplicates) { + const newDeleteIds = dup.deleteIds.filter(id => !seenDeleteIds.has(id)); + if (newDeleteIds.length > 0) { + dup.deleteIds = newDeleteIds; + newDeleteIds.forEach(id => seenDeleteIds.add(id)); + duplicates.push(dup); + } + } + + console.log(` Found ${fieldDuplicates.length} duplicate groups for ${field}`); + console.log(` New unique documents to delete: ${duplicates.reduce((sum, dup) => sum + dup.deleteIds.length, 0)}`); + + // Force garbage collection if available + if (global.gc) { + global.gc(); + console.log(` ♻️ Memory freed after processing ${field}`); + } + + return { duplicates, totalFound: fieldDuplicates.length }; +} + +/** + * Phase 3: Process deletion for a batch of duplicates + */ +async function phase3_DeleteBatch( + client: Client, + deleteIds: string[], + batchSize: number, + startIndex: number +): Promise<{ deleted: number, errors: number }> { + const batch = deleteIds.slice(startIndex, startIndex + batchSize); + let deleted = 0; + let errors = 0; + + try { + const bulkOperations = batch.flatMap(id => [ + { delete: { _index: INDEX_NAME, _id: id } } + ]); + + const bulkResponse = await client.bulk({ + operations: bulkOperations, + refresh: false + }); + + if (bulkResponse.errors) { + const errorCount = bulkResponse.items.filter((item: any) => item.delete?.error).length; + errors += errorCount; + deleted += batch.length - errorCount; + } else { + deleted += batch.length; + } + } catch (error) { + console.error(`\n❌ Error deleting batch:`, error); + errors += batch.length; + } + + // Force garbage collection if available + if (global.gc) { + global.gc(); + } + + return { deleted, errors }; +} + +/** + * Phase 4: Finalize and report results + */ +async function phase4_Finalize( + client: Client, + totalDeleted: number, + totalErrors: number, + initialDocumentCount: number +) { + console.log('\n\n🔄 Phase 4: Refreshing index...'); + await client.indices.refresh({ index: INDEX_NAME }); + + const newCountResponse = await client.count({ index: INDEX_NAME }); + + console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'); + console.log('✅ Duplicate removal complete!'); + console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); + console.log(`Documents deleted: ${totalDeleted}`); + console.log(`Errors: ${totalErrors}`); + console.log(`Previous document count: ${initialDocumentCount}`); + console.log(`New document count: ${newCountResponse.count}`); + console.log(''); +} + +async function removeDuplicates(parsedArgs: ParsedArgs) { const fields = parsedArgs.field ? [parsedArgs.field] : ['plaintext', 'md5', 'sha1', 'sha256', 'sha512']; - console.log(`🔍 Hasher Duplicate Remover`); - console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); - console.log(`Elasticsearch: ${ELASTICSEARCH_NODE}`); - console.log(`Index: ${INDEX_NAME}`); console.log(`Mode: ${parsedArgs.dryRun ? '🔎 DRY RUN (no changes)' : '⚠️ EXECUTE (will delete)'}`); console.log(`Batch size: ${parsedArgs.batchSize}`); console.log(`Fields to check: ${fields.join(', ')}`); console.log(''); try { - // Test connection - console.log('🔗 Connecting to Elasticsearch...'); - await client.cluster.health({}); - console.log('✅ Connected successfully\n'); - - // Get index stats - const countResponse = await client.count({ index: INDEX_NAME }); - console.log(`📊 Total documents in index: ${countResponse.count}\n`); + // === PHASE 1: Initialize === + const { client, totalDocuments } = await phase1_InitAndConnect(); + + // Force garbage collection after phase 1 + if (global.gc) { + global.gc(); + console.log('♻️ Memory freed after initialization\n'); + } + // === PHASE 2: Find duplicates field by field === const allDuplicates: DuplicateGroup[] = []; const seenDeleteIds = new Set(); - // Find duplicates for each field for (const field of fields) { - console.log(`🔍 Checking duplicates for field: ${field}...`); - const fieldDuplicates = await findDuplicatesForField(client, field, parsedArgs.batchSize); + const { duplicates } = await phase2_FindDuplicatesForField( + client, + field, + parsedArgs.batchSize, + seenDeleteIds + ); + allDuplicates.push(...duplicates); - // Filter out already seen delete IDs to avoid counting the same document multiple times - for (const dup of fieldDuplicates) { - const newDeleteIds = dup.deleteIds.filter(id => !seenDeleteIds.has(id)); - if (newDeleteIds.length > 0) { - dup.deleteIds = newDeleteIds; - newDeleteIds.forEach(id => seenDeleteIds.add(id)); - allDuplicates.push(dup); - } - } - - console.log(` Found ${fieldDuplicates.length} duplicate groups for ${field}`); + // Clear field duplicates to free memory + duplicates.length = 0; } const totalToDelete = allDuplicates.reduce((sum, dup) => sum + dup.deleteIds.length, 0); @@ -310,57 +426,40 @@ async function removeDuplicates(parsedArgs: ParsedArgs) { return; } - // Execute deletion - console.log(`\n🗑️ Removing ${totalToDelete} duplicate documents...\n`); + // === PHASE 3: Execute deletion in batches === + console.log(`\n🗑️ Phase 3: Removing ${totalToDelete} duplicate documents...\n`); - let deleted = 0; - let errors = 0; + let totalDeleted = 0; + let totalErrors = 0; const deleteIds = allDuplicates.flatMap(dup => dup.deleteIds); - // Delete in batches + // Clear allDuplicates to free memory + allDuplicates.length = 0; + + // Delete in batches with memory management for (let i = 0; i < deleteIds.length; i += parsedArgs.batchSize) { - const batch = deleteIds.slice(i, i + parsedArgs.batchSize); + const { deleted, errors } = await phase3_DeleteBatch( + client, + deleteIds, + parsedArgs.batchSize, + i + ); - try { - const bulkOperations = batch.flatMap(id => [ - { delete: { _index: INDEX_NAME, _id: id } } - ]); + totalDeleted += deleted; + totalErrors += errors; - const bulkResponse = await client.bulk({ - operations: bulkOperations, - refresh: false - }); - - if (bulkResponse.errors) { - const errorCount = bulkResponse.items.filter((item: any) => item.delete?.error).length; - errors += errorCount; - deleted += batch.length - errorCount; - } else { - deleted += batch.length; - } - - process.stdout.write(`\r⏳ Progress: ${Math.min(i + parsedArgs.batchSize, deleteIds.length)}/${deleteIds.length} - Deleted: ${deleted}, Errors: ${errors}`); - } catch (error) { - console.error(`\n❌ Error deleting batch:`, error); - errors += batch.length; - } + process.stdout.write( + `\r⏳ Progress: ${Math.min(i + parsedArgs.batchSize, deleteIds.length)}/${deleteIds.length} - ` + + `Deleted: ${totalDeleted}, Errors: ${totalErrors}` + ); } - // Refresh index - console.log('\n\n🔄 Refreshing index...'); - await client.indices.refresh({ index: INDEX_NAME }); + // Clear deleteIds to free memory + deleteIds.length = 0; + seenDeleteIds.clear(); - // Get new count - const newCountResponse = await client.count({ index: INDEX_NAME }); - - console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'); - console.log('✅ Duplicate removal complete!'); - console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); - console.log(`Documents deleted: ${deleted}`); - console.log(`Errors: ${errors}`); - console.log(`Previous document count: ${countResponse.count}`); - console.log(`New document count: ${newCountResponse.count}`); - console.log(''); + // === PHASE 4: Finalize === + await phase4_Finalize(client, totalDeleted, totalErrors, totalDocuments); } catch (error) { console.error('\n❌ Error:', error instanceof Error ? error.message : error);