redis migration

Signed-off-by: ale <ale@manalejandro.com>
Este commit está contenido en:
ale
2025-12-15 17:43:08 +01:00
padre da89037125
commit 4d9545d0ec
Se han modificado 18 ficheros con 1218 adiciones y 1065 borrados

Ver fichero

@@ -4,7 +4,7 @@
* Hasher Indexer Script
*
* This script reads a text file with one word/phrase per line and indexes
* all the generated hashes into Elasticsearch.
* all the generated hashes into Redis.
*
* Usage:
* npx tsx scripts/index-file.ts <path-to-file.txt> [options]
@@ -19,14 +19,16 @@
* --help, -h Show this help message
*/
import { Client } from '@elastic/elasticsearch';
import Redis from 'ioredis';
import { createReadStream, existsSync, readFileSync, writeFileSync, unlinkSync } from 'fs';
import { resolve, basename } from 'path';
import { createInterface } from 'readline';
import crypto from 'crypto';
const ELASTICSEARCH_NODE = process.env.ELASTICSEARCH_NODE || 'http://localhost:9200';
const INDEX_NAME = 'hasher';
const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379', 10);
const REDIS_PASSWORD = process.env.REDIS_PASSWORD || undefined;
const REDIS_DB = parseInt(process.env.REDIS_DB || '0', 10);
const DEFAULT_BATCH_SIZE = 100;
interface HashDocument {
@@ -87,13 +89,12 @@ function parseArgs(args: string[]): ParsedArgs {
result.batchSize = parsed;
}
} else if (arg === '--batch-size') {
// Support --batch-size <value> format
const nextArg = args[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
const parsed = parseInt(nextArg, 10);
if (!isNaN(parsed) && parsed > 0) {
result.batchSize = parsed;
i++; // Skip next argument
i++;
}
}
} else if (arg.startsWith('--state-file=')) {
@@ -105,7 +106,6 @@ function parseArgs(args: string[]): ParsedArgs {
i++;
}
} else if (!arg.startsWith('-')) {
// Positional argument - treat as file path
result.filePath = arg;
}
}
@@ -113,49 +113,6 @@ function parseArgs(args: string[]): ParsedArgs {
return result;
}
function getFileHash(filePath: string): string {
// Create a hash based on file path and size for quick identification
const stats = require('fs').statSync(filePath);
const hashInput = `${filePath}:${stats.size}:${stats.mtime.getTime()}`;
return crypto.createHash('md5').update(hashInput).digest('hex').substring(0, 8);
}
function getDefaultStateFile(filePath: string): string {
const fileName = basename(filePath).replace(/\.[^.]+$/, '');
return resolve(`.indexer-state-${fileName}.json`);
}
function loadState(stateFile: string): IndexerState | null {
try {
if (existsSync(stateFile)) {
const data = readFileSync(stateFile, 'utf-8');
return JSON.parse(data) as IndexerState;
}
} catch (error) {
console.warn(`⚠️ Could not load state file: ${error}`);
}
return null;
}
function saveState(stateFile: string, state: IndexerState): void {
try {
state.lastUpdate = new Date().toISOString();
writeFileSync(stateFile, JSON.stringify(state, null, 2), 'utf-8');
} catch (error) {
console.error(`❌ Could not save state file: ${error}`);
}
}
function deleteState(stateFile: string): void {
try {
if (existsSync(stateFile)) {
unlinkSync(stateFile);
}
} catch (error) {
console.warn(`⚠️ Could not delete state file: ${error}`);
}
}
function generateHashes(plaintext: string): HashDocument {
return {
plaintext,
@@ -185,74 +142,169 @@ Options:
--help, -h Show this help message
Environment Variables:
ELASTICSEARCH_NODE Elasticsearch node URL (default: http://localhost:9200)
REDIS_HOST Redis host (default: localhost)
REDIS_PORT Redis port (default: 6379)
REDIS_PASSWORD Redis password (optional)
REDIS_DB Redis database number (default: 0)
Examples:
npx tsx scripts/index-file.ts wordlist.txt
npx tsx scripts/index-file.ts wordlist.txt --batch-size=500
npx tsx scripts/index-file.ts wordlist.txt --batch-size 500
npx tsx scripts/index-file.ts wordlist.txt --no-resume
npx tsx scripts/index-file.ts wordlist.txt --no-check
npm run index-file -- wordlist.txt --batch-size=500 --no-check
# Index a file with default settings
npm run index-file -- wordlist.txt
State Management:
The script automatically saves progress to a state file. If interrupted,
it will resume from where it left off on the next run. Use --no-resume
to start fresh.
# Index with custom batch size
npm run index-file -- wordlist.txt --batch-size=500
Duplicate Checking:
By default, the script checks if each plaintext or hash already exists
in the index before inserting. Use --no-check to skip this verification
for faster indexing (useful when you're sure there are no duplicates).
# Start fresh (ignore previous state)
npm run index-file -- wordlist.txt --no-resume
# Skip duplicate checking for speed
npm run index-file -- wordlist.txt --no-check
`);
process.exit(0);
}
async function indexFile(filePath: string, batchSize: number, shouldResume: boolean, checkDuplicates: boolean, customStateFile: string | null) {
const client = new Client({ node: ELASTICSEARCH_NODE });
const absolutePath = resolve(filePath);
const stateFile = customStateFile || getDefaultStateFile(absolutePath);
const fileHash = getFileHash(absolutePath);
function computeFileHash(filePath: string): string {
const fileBuffer = readFileSync(filePath);
const hashSum = crypto.createHash('sha256');
hashSum.update(fileBuffer);
return hashSum.digest('hex');
}
// State management
let state: IndexerState = {
filePath: absolutePath,
fileHash,
lastProcessedLine: 0,
totalLines: 0,
indexed: 0,
skipped: 0,
errors: 0,
startTime: Date.now(),
lastUpdate: new Date().toISOString()
};
function getStateFilePath(filePath: string, customPath: string | null): string {
if (customPath) {
return resolve(customPath);
}
const fileName = basename(filePath);
return resolve(`.indexer-state-${fileName}.json`);
}
// Check for existing state
const existingState = loadState(stateFile);
let resumingFrom = 0;
if (shouldResume && existingState) {
if (existingState.fileHash === fileHash) {
state = existingState;
resumingFrom = state.lastProcessedLine;
state.startTime = Date.now(); // Reset start time for this session
console.log(`📂 Found existing state, resuming from line ${resumingFrom}`);
} else {
console.log(`⚠️ File has changed since last run, starting fresh`);
deleteState(stateFile);
}
} else if (!shouldResume) {
deleteState(stateFile);
function loadState(stateFilePath: string): IndexerState | null {
if (!existsSync(stateFilePath)) {
return null;
}
console.log(`📚 Hasher Indexer`);
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Elasticsearch: ${ELASTICSEARCH_NODE}`);
console.log(`Index: ${INDEX_NAME}`);
try {
const data = readFileSync(stateFilePath, 'utf-8');
return JSON.parse(data);
} catch (error) {
console.warn(`⚠️ Could not load state file: ${error}`);
return null;
}
}
function saveState(stateFilePath: string, state: IndexerState): void {
try {
writeFileSync(stateFilePath, JSON.stringify(state, null, 2), 'utf-8');
} catch (error) {
console.error(`❌ Could not save state file: ${error}`);
}
}
function deleteState(stateFilePath: string): void {
try {
if (existsSync(stateFilePath)) {
unlinkSync(stateFilePath);
}
} catch (error) {
console.warn(`⚠️ Could not delete state file: ${error}`);
}
}
async function countLines(filePath: string): Promise<number> {
return new Promise((resolve, reject) => {
let lineCount = 0;
const rl = createInterface({
input: createReadStream(filePath),
crlfDelay: Infinity
});
rl.on('line', () => lineCount++);
rl.on('close', () => resolve(lineCount));
rl.on('error', reject);
});
}
async function main() {
const args = process.argv.slice(2);
const parsed = parseArgs(args);
if (parsed.showHelp || !parsed.filePath) {
showHelp();
process.exit(parsed.showHelp ? 0 : 1);
}
const filePath = parsed.filePath!;
const batchSize = parsed.batchSize;
const checkDuplicates = parsed.checkDuplicates;
const absolutePath = resolve(filePath);
if (!existsSync(absolutePath)) {
console.error(`❌ File not found: ${absolutePath}`);
process.exit(1);
}
const stateFile = getStateFilePath(filePath, parsed.stateFile);
const fileHash = computeFileHash(absolutePath);
let state: IndexerState;
let resumingFrom = 0;
if (parsed.resume) {
const loadedState = loadState(stateFile);
if (loadedState && loadedState.fileHash === fileHash) {
state = loadedState;
resumingFrom = state.lastProcessedLine;
console.log(`📂 Resuming from previous state: ${stateFile}`);
} else {
if (loadedState) {
console.log('⚠️ File has changed or state file is from a different file. Starting fresh.');
}
state = {
filePath: absolutePath,
fileHash,
lastProcessedLine: 0,
totalLines: 0,
indexed: 0,
skipped: 0,
errors: 0,
startTime: Date.now(),
lastUpdate: new Date().toISOString()
};
}
} else {
deleteState(stateFile);
state = {
filePath: absolutePath,
fileHash,
lastProcessedLine: 0,
totalLines: 0,
indexed: 0,
skipped: 0,
errors: 0,
startTime: Date.now(),
lastUpdate: new Date().toISOString()
};
}
if (state.totalLines === 0) {
console.log('🔢 Counting lines...');
state.totalLines = await countLines(absolutePath);
}
const client = new Redis({
host: REDIS_HOST,
port: REDIS_PORT,
password: REDIS_PASSWORD,
db: REDIS_DB,
});
console.log('');
console.log('📚 Hasher Indexer');
console.log('━'.repeat(42));
console.log(`Redis: ${REDIS_HOST}:${REDIS_PORT}`);
console.log(`File: ${filePath}`);
console.log(`Batch size: ${batchSize}`);
console.log(`Check duplicates: ${checkDuplicates ? 'yes' : 'no (--no-check)'}`);
console.log(`State file: ${stateFile}`);
console.log(`Duplicate check: ${checkDuplicates ? 'enabled' : 'disabled (--no-check)'}`);
if (resumingFrom > 0) {
console.log(`Resuming from: line ${resumingFrom}`);
console.log(`Already indexed: ${state.indexed}`);
@@ -260,7 +312,6 @@ async function indexFile(filePath: string, batchSize: number, shouldResume: bool
}
console.log('');
// Handle interruption signals
let isInterrupted = false;
const handleInterrupt = () => {
if (isInterrupted) {
@@ -272,7 +323,6 @@ async function indexFile(filePath: string, batchSize: number, shouldResume: bool
saveState(stateFile, state);
console.log(`💾 State saved to ${stateFile}`);
console.log(` Resume with: npx tsx scripts/index-file.ts ${filePath}`);
console.log(` Or start fresh with: npx tsx scripts/index-file.ts ${filePath} --no-resume`);
process.exit(0);
};
@@ -280,13 +330,11 @@ async function indexFile(filePath: string, batchSize: number, shouldResume: bool
process.on('SIGTERM', handleInterrupt);
try {
// Test connection
console.log('🔗 Connecting to Elasticsearch...');
await client.cluster.health({});
console.log('🔗 Connecting to Redis...');
await client.ping();
console.log('✅ Connected successfully\n');
// Process file line by line using streams
console.log('📖 Processing file...\n');
console.log('📖 Reading file...\n');
let currentLineNumber = 0;
let currentBatch: string[] = [];
@@ -301,219 +349,128 @@ async function indexFile(filePath: string, batchSize: number, shouldResume: bool
crlfDelay: Infinity
});
const processBatch = async (batch: string[], lineNumber: number) => {
if (batch.length === 0) return;
if (isInterrupted) return;
const processBatch = async (batch: string[]) => {
if (batch.length === 0 || isInterrupted) return;
const bulkOperations: any[] = [];
const batchWithHashes = batch.map(plaintext => generateHashes(plaintext));
// Generate hashes for all items in batch first
const batchWithHashes = batch.map((plaintext: string) => ({
plaintext,
hashes: generateHashes(plaintext)
}));
let toIndex = batchWithHashes;
if (checkDuplicates) {
// Check which items already exist (by plaintext or any hash)
const md5List = batchWithHashes.map((item: any) => item.hashes.md5);
const sha1List = batchWithHashes.map((item: any) => item.hashes.sha1);
const sha256List = batchWithHashes.map((item: any) => item.hashes.sha256);
const sha512List = batchWithHashes.map((item: any) => item.hashes.sha512);
const existenceChecks = await Promise.all(
batchWithHashes.map(doc => client.exists(`hash:plaintext:${doc.plaintext}`))
);
const existingCheck = await client.search({
index: INDEX_NAME,
size: batchSize * 5,
query: {
bool: {
should: [
{ terms: { 'plaintext.keyword': batch } },
{ terms: { md5: md5List } },
{ terms: { sha1: sha1List } },
{ terms: { sha256: sha256List } },
{ terms: { sha512: sha512List } },
],
minimum_should_match: 1
}
},
_source: ['plaintext', 'md5', 'sha1', 'sha256', 'sha512']
});
const newDocs = batchWithHashes.filter((_doc, idx) => existenceChecks[idx] === 0);
const existingCount = batchWithHashes.length - newDocs.length;
// Create a set of existing hashes for quick lookup
const existingHashes = new Set<string>();
existingCheck.hits.hits.forEach((hit: any) => {
const src = hit._source;
existingHashes.add(src.plaintext);
existingHashes.add(src.md5);
existingHashes.add(src.sha1);
existingHashes.add(src.sha256);
existingHashes.add(src.sha512);
});
state.skipped += existingCount;
sessionSkipped += existingCount;
toIndex = newDocs;
}
// Prepare bulk operations only for items that don't have any duplicate hash
for (const item of batchWithHashes) {
const isDuplicate =
existingHashes.has(item.plaintext) ||
existingHashes.has(item.hashes.md5) ||
existingHashes.has(item.hashes.sha1) ||
existingHashes.has(item.hashes.sha256) ||
existingHashes.has(item.hashes.sha512);
if (toIndex.length > 0) {
const pipeline = client.pipeline();
if (!isDuplicate) {
bulkOperations.push({ index: { _index: INDEX_NAME } });
bulkOperations.push(item.hashes);
} else {
state.skipped++;
sessionSkipped++;
}
for (const doc of toIndex) {
const key = `hash:plaintext:${doc.plaintext}`;
pipeline.set(key, JSON.stringify(doc));
pipeline.set(`hash:index:md5:${doc.md5}`, doc.plaintext);
pipeline.set(`hash:index:sha1:${doc.sha1}`, doc.plaintext);
pipeline.set(`hash:index:sha256:${doc.sha256}`, doc.plaintext);
pipeline.set(`hash:index:sha512:${doc.sha512}`, doc.plaintext);
pipeline.hincrby('hash:stats', 'count', 1);
pipeline.hincrby('hash:stats', 'size', JSON.stringify(doc).length);
}
} else {
// No duplicate checking - index everything
for (const item of batchWithHashes) {
bulkOperations.push({ index: { _index: INDEX_NAME } });
bulkOperations.push(item.hashes);
const results = await pipeline.exec();
const errorCount = results?.filter(([err]) => err !== null).length || 0;
if (errorCount > 0) {
state.errors += errorCount;
sessionErrors += errorCount;
const successCount = toIndex.length - errorCount;
state.indexed += successCount;
sessionIndexed += successCount;
} else {
state.indexed += toIndex.length;
sessionIndexed += toIndex.length;
}
}
// Execute bulk operation only if there are new items to insert
if (bulkOperations.length > 0) {
try {
const bulkResponse = await client.bulk({
operations: bulkOperations,
refresh: false
});
state.lastUpdate = new Date().toISOString();
const progress = ((state.lastProcessedLine / state.totalLines) * 100).toFixed(1);
process.stdout.write(
`\r⏳ Progress: ${state.lastProcessedLine}/${state.totalLines} (${progress}%) - ` +
`Indexed: ${sessionIndexed}, Skipped: ${sessionSkipped}, Errors: ${sessionErrors} `
);
if (bulkResponse.errors) {
const errorCount = bulkResponse.items.filter((item: any) => item.index?.error).length;
state.errors += errorCount;
sessionErrors += errorCount;
const successCount = (bulkOperations.length / 2) - errorCount;
state.indexed += successCount;
sessionIndexed += successCount;
} else {
const count = bulkOperations.length / 2;
state.indexed += count;
sessionIndexed += count;
}
} catch (error) {
console.error(`\n❌ Error processing batch:`, error);
const count = bulkOperations.length / 2;
state.errors += count;
sessionErrors += count;
}
}
// Update state
state.lastProcessedLine = lineNumber;
state.totalLines = lineNumber;
// Save state periodically (every 10 batches)
if (lineNumber % (batchSize * 10) === 0) {
saveState(stateFile, state);
}
// Progress indicator
const elapsed = ((Date.now() - sessionStartTime) / 1000).toFixed(0);
process.stdout.write(`\r⏳ Line: ${lineNumber} | Session: +${sessionIndexed} indexed, +${sessionSkipped} skipped | Total: ${state.indexed} indexed | Time: ${elapsed}s`);
saveState(stateFile, state);
};
for await (const line of rl) {
if (isInterrupted) break;
currentLineNumber++;
// Skip already processed lines
if (currentLineNumber <= resumingFrom) {
continue;
}
const trimmedLine = line.trim();
if (trimmedLine.length > 0) {
// Only take first word (no spaces or separators)
const firstWord = trimmedLine.split(/\s+/)[0];
if (firstWord) {
currentBatch.push(firstWord);
if (isInterrupted) break;
if (currentBatch.length >= batchSize) {
await processBatch(currentBatch, currentLineNumber);
currentBatch = [];
}
}
const trimmed = line.trim();
if (!trimmed) continue;
currentBatch.push(trimmed);
state.lastProcessedLine = currentLineNumber;
if (currentBatch.length >= batchSize) {
await processBatch(currentBatch);
currentBatch = [];
}
}
// Process remaining items in last batch
if (currentBatch.length > 0 && !isInterrupted) {
await processBatch(currentBatch, currentLineNumber);
await processBatch(currentBatch);
}
if (isInterrupted) {
return;
console.log('\n');
if (!isInterrupted) {
const totalTime = ((Date.now() - sessionStartTime) / 1000).toFixed(2);
const rate = (sessionIndexed / parseFloat(totalTime)).toFixed(2);
console.log('━'.repeat(42));
console.log('✅ Indexing complete!');
console.log('');
console.log('📊 Session Statistics:');
console.log(` Indexed: ${sessionIndexed}`);
console.log(` Skipped: ${sessionSkipped}`);
console.log(` Errors: ${sessionErrors}`);
console.log(` Time: ${totalTime}s`);
console.log(` Rate: ${rate} docs/sec`);
console.log('');
console.log('📈 Total Statistics:');
console.log(` Total indexed: ${state.indexed}`);
console.log(` Total skipped: ${state.skipped}`);
console.log(` Total errors: ${state.errors}`);
console.log('');
deleteState(stateFile);
}
// Refresh index
console.log('\n\n🔄 Refreshing index...');
await client.indices.refresh({ index: INDEX_NAME });
// Delete state file on successful completion
deleteState(stateFile);
const duration = ((Date.now() - sessionStartTime) / 1000).toFixed(2);
const rate = sessionIndexed > 0 ? (sessionIndexed / parseFloat(duration)).toFixed(0) : '0';
console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━');
console.log('✅ Indexing complete!');
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Total lines processed: ${currentLineNumber}`);
if (resumingFrom > 0) {
console.log(`Lines skipped (resumed): ${resumingFrom}`);
console.log(`Lines processed this session: ${currentLineNumber - resumingFrom}`);
}
console.log(`Successfully indexed (total): ${state.indexed}`);
console.log(`Successfully indexed (session): ${sessionIndexed}`);
console.log(`Skipped duplicates (total): ${state.skipped}`);
console.log(`Skipped duplicates (session): ${sessionSkipped}`);
console.log(`Errors (total): ${state.errors}`);
console.log(`Session duration: ${duration}s`);
console.log(`Session rate: ${rate} docs/sec`);
console.log('');
await client.quit();
} catch (error) {
// Save state on error
console.error('\n\n❌ Error:', error);
saveState(stateFile, state);
console.error(`\n💾 State saved to ${stateFile}`);
console.error('❌ Error:', error instanceof Error ? error.message : error);
console.log(`💾 State saved to ${stateFile}`);
await client.quit();
process.exit(1);
} finally {
// Remove signal handlers
process.removeListener('SIGINT', handleInterrupt);
process.removeListener('SIGTERM', handleInterrupt);
}
}
// Parse command line arguments
const args = process.argv.slice(2);
const parsedArgs = parseArgs(args);
if (parsedArgs.showHelp || !parsedArgs.filePath) {
showHelp();
}
const filePath = parsedArgs.filePath as string;
// Validate file exists
if (!existsSync(filePath)) {
console.error(`❌ File not found: ${filePath}`);
process.exit(1);
}
console.log(`\n🔧 Configuration:`);
console.log(` File: ${filePath}`);
console.log(` Batch size: ${parsedArgs.batchSize}`);
console.log(` Resume: ${parsedArgs.resume}`);
console.log(` Check duplicates: ${parsedArgs.checkDuplicates}`);
if (parsedArgs.stateFile) {
console.log(` State file: ${parsedArgs.stateFile}`);
}
console.log('');
indexFile(filePath, parsedArgs.batchSize, parsedArgs.resume, parsedArgs.checkDuplicates, parsedArgs.stateFile).catch(console.error);
main();

Ver fichero

@@ -3,7 +3,7 @@
/**
* Hasher Duplicate Remover Script
*
* This script finds and removes duplicate entries from the Elasticsearch index.
* This script finds and removes duplicate entries from Redis.
* It identifies duplicates by checking plaintext, md5, sha1, sha256, and sha512 fields.
*
* Usage:
@@ -13,17 +13,28 @@
* Options:
* --dry-run Show duplicates without removing them (default)
* --execute Actually remove the duplicates
* --batch-size=<number> Number of items to process in each batch (default: 1000)
* --field=<field> Check duplicates only on this field (plaintext, md5, sha1, sha256, sha512)
* --batch-size=<number> Number of keys to scan in each batch (default: 1000)
* --field=<field> Check duplicates only on this field (md5, sha1, sha256, sha512)
* --help, -h Show this help message
*/
import { Client } from '@elastic/elasticsearch';
import Redis from 'ioredis';
const ELASTICSEARCH_NODE = process.env.ELASTICSEARCH_NODE || 'http://localhost:9200';
const INDEX_NAME = 'hasher';
const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379', 10);
const REDIS_PASSWORD = process.env.REDIS_PASSWORD || undefined;
const REDIS_DB = parseInt(process.env.REDIS_DB || '0', 10);
const DEFAULT_BATCH_SIZE = 1000;
interface HashDocument {
plaintext: string;
md5: string;
sha1: string;
sha256: string;
sha512: string;
created_at: string;
}
interface ParsedArgs {
dryRun: boolean;
batchSize: number;
@@ -34,9 +45,9 @@ interface ParsedArgs {
interface DuplicateGroup {
value: string;
field: string;
documentIds: string[];
keepId: string;
deleteIds: string[];
plaintexts: string[];
keepPlaintext: string;
deletePlaintexts: string[];
}
function parseArgs(args: string[]): ParsedArgs {
@@ -96,302 +107,244 @@ Usage:
Options:
--dry-run Show duplicates without removing them (default)
--execute Actually remove the duplicates
--batch-size=<number> Number of items to process in each batch (default: 1000)
--batch-size=<number> Number of keys to scan in each batch (default: 1000)
--field=<field> Check duplicates only on this field
Valid fields: plaintext, md5, sha1, sha256, sha512
Valid fields: md5, sha1, sha256, sha512
--help, -h Show this help message
Environment Variables:
ELASTICSEARCH_NODE Elasticsearch node URL (default: http://localhost:9200)
REDIS_HOST Redis host (default: localhost)
REDIS_PORT Redis port (default: 6379)
REDIS_PASSWORD Redis password (optional)
REDIS_DB Redis database number (default: 0)
Examples:
npx tsx scripts/remove-duplicates.ts # Dry run, show all duplicates
npx tsx scripts/remove-duplicates.ts --execute # Remove all duplicates
npx tsx scripts/remove-duplicates.ts --field=md5 # Check only md5 duplicates
npx tsx scripts/remove-duplicates.ts --execute --field=plaintext
# Dry run (show duplicates only)
npm run remove-duplicates
Notes:
- The script keeps the OLDEST document (by created_at) and removes newer duplicates
- Always run with --dry-run first to review what will be deleted
- Duplicates are checked across all hash fields by default
# Actually remove duplicates
npm run remove-duplicates -- --execute
# Check only MD5 duplicates
npm run remove-duplicates -- --field=md5 --execute
Description:
This script scans through all hash documents in Redis and identifies
duplicates based on hash values. When duplicates are found, it keeps
the oldest entry (by created_at) and marks the rest for deletion.
`);
process.exit(0);
}
async function findDuplicatesForField(
client: Client,
field: string,
client: Redis,
field: 'md5' | 'sha1' | 'sha256' | 'sha512',
batchSize: number
): Promise<DuplicateGroup[]> {
const duplicates: DuplicateGroup[] = [];
// Use aggregation to find duplicate values
const fieldToAggregate = field === 'plaintext' ? 'plaintext.keyword' : field;
// Use composite aggregation to handle large number of duplicates
let afterKey: any = undefined;
let hasMore = true;
console.log(` Scanning for duplicates...`);
while (hasMore) {
const aggQuery: any = {
index: INDEX_NAME,
size: 0,
aggs: {
duplicates: {
composite: {
size: batchSize,
sources: [
{ value: { terms: { field: fieldToAggregate } } }
],
...(afterKey && { after: afterKey })
},
aggs: {
doc_count_filter: {
bucket_selector: {
buckets_path: { count: '_count' },
script: 'params.count > 1'
}
}
}
}
}
};
const pattern = `hash:index:${field}:*`;
const hashToPlaintexts: Map<string, string[]> = new Map();
const response = await client.search(aggQuery);
const compositeAgg = response.aggregations?.duplicates as any;
const buckets = compositeAgg?.buckets || [];
console.log(`🔍 Scanning ${field} indexes...`);
for (const bucket of buckets) {
if (bucket.doc_count > 1) {
const value = bucket.key.value;
// Use scroll API for large result sets
const documentIds: string[] = [];
let scrollResponse = await client.search({
index: INDEX_NAME,
scroll: '1m',
size: 1000,
query: {
term: {
[fieldToAggregate]: value
}
},
sort: [
{ created_at: { order: 'asc' } }
],
_source: false
});
let cursor = '0';
let keysScanned = 0;
while (scrollResponse.hits.hits.length > 0) {
documentIds.push(...scrollResponse.hits.hits.map((hit: any) => hit._id));
if (!scrollResponse._scroll_id) break;
scrollResponse = await client.scroll({
scroll_id: scrollResponse._scroll_id,
scroll: '1m'
});
}
do {
const [nextCursor, keys] = await client.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize);
cursor = nextCursor;
keysScanned += keys.length;
// Clear scroll
if (scrollResponse._scroll_id) {
await client.clearScroll({ scroll_id: scrollResponse._scroll_id }).catch(() => {});
}
if (documentIds.length > 1) {
duplicates.push({
value: String(value),
field,
documentIds,
keepId: documentIds[0], // Keep the oldest
deleteIds: documentIds.slice(1) // Delete the rest
});
for (const key of keys) {
const hash = key.replace(`hash:index:${field}:`, '');
const plaintext = await client.get(key);
if (plaintext) {
if (!hashToPlaintexts.has(hash)) {
hashToPlaintexts.set(hash, []);
}
hashToPlaintexts.get(hash)!.push(plaintext);
}
}
// Check if there are more results
afterKey = compositeAgg?.after_key;
hasMore = buckets.length === batchSize && afterKey;
if (hasMore) {
process.stdout.write(`\r Found ${duplicates.length} duplicate groups so far...`);
process.stdout.write(`\r Keys scanned: ${keysScanned} `);
} while (cursor !== '0');
console.log('');
const duplicates: DuplicateGroup[] = [];
for (const [hash, plaintexts] of hashToPlaintexts.entries()) {
if (plaintexts.length > 1) {
// Fetch documents to get created_at timestamps
const docs = await Promise.all(
plaintexts.map(async (pt) => {
const data = await client.get(`hash:plaintext:${pt}`);
return data ? JSON.parse(data) as HashDocument : null;
})
);
const validDocs = docs.filter((doc): doc is HashDocument => doc !== null);
if (validDocs.length > 1) {
// Sort by created_at, keep oldest
validDocs.sort((a, b) => a.created_at.localeCompare(b.created_at));
duplicates.push({
value: hash,
field,
plaintexts: validDocs.map(d => d.plaintext),
keepPlaintext: validDocs[0].plaintext,
deletePlaintexts: validDocs.slice(1).map(d => d.plaintext)
});
}
}
}
return duplicates;
}
async function removeDuplicates(parsedArgs: ParsedArgs) {
const client = new Client({ node: ELASTICSEARCH_NODE });
const fields = parsedArgs.field
? [parsedArgs.field]
: ['plaintext', 'md5', 'sha1', 'sha256', 'sha512'];
async function removeDuplicates(
client: Redis,
duplicates: DuplicateGroup[],
dryRun: boolean
): Promise<{ deleted: number; errors: number }> {
let deleted = 0;
let errors = 0;
console.log(`🔍 Hasher Duplicate Remover`);
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Elasticsearch: ${ELASTICSEARCH_NODE}`);
console.log(`Index: ${INDEX_NAME}`);
console.log(`Mode: ${parsedArgs.dryRun ? '🔎 DRY RUN (no changes)' : '⚠️ EXECUTE (will delete)'}`);
console.log(`Batch size: ${parsedArgs.batchSize}`);
console.log(`Fields to check: ${fields.join(', ')}`);
console.log('');
console.log(`${dryRun ? '🔍 DRY RUN - Would delete:' : '🗑️ Deleting duplicates...'}`);
console.log('');
for (const dup of duplicates) {
console.log(`Duplicate ${dup.field}: ${dup.value}`);
console.log(` Keep: ${dup.keepPlaintext} (oldest)`);
console.log(` Delete: ${dup.deletePlaintexts.join(', ')}`);
if (!dryRun) {
for (const plaintext of dup.deletePlaintexts) {
try {
const docKey = `hash:plaintext:${plaintext}`;
const docData = await client.get(docKey);
if (docData) {
const doc: HashDocument = JSON.parse(docData);
const pipeline = client.pipeline();
// Delete the main document
pipeline.del(docKey);
// Delete all indexes
pipeline.del(`hash:index:md5:${doc.md5}`);
pipeline.del(`hash:index:sha1:${doc.sha1}`);
pipeline.del(`hash:index:sha256:${doc.sha256}`);
pipeline.del(`hash:index:sha512:${doc.sha512}`);
// Update statistics
pipeline.hincrby('hash:stats', 'count', -1);
pipeline.hincrby('hash:stats', 'size', -JSON.stringify(doc).length);
const results = await pipeline.exec();
if (results && results.some(([err]) => err !== null)) {
errors++;
} else {
deleted++;
}
}
} catch (error) {
console.error(` Error deleting ${plaintext}:`, error);
errors++;
}
}
}
console.log('');
}
return { deleted, errors };
}
async function main() {
const args = process.argv.slice(2);
const parsed = parseArgs(args);
if (parsed.showHelp) {
showHelp();
process.exit(0);
}
const validFields: Array<'md5' | 'sha1' | 'sha256' | 'sha512'> = ['md5', 'sha1', 'sha256', 'sha512'];
const fieldsToCheck = parsed.field
? [parsed.field as 'md5' | 'sha1' | 'sha256' | 'sha512']
: validFields;
// Validate field
if (parsed.field && !validFields.includes(parsed.field as any)) {
console.error(`❌ Invalid field: ${parsed.field}`);
console.error(` Valid fields: ${validFields.join(', ')}`);
process.exit(1);
}
const client = new Redis({
host: REDIS_HOST,
port: REDIS_PORT,
password: REDIS_PASSWORD,
db: REDIS_DB,
});
console.log('');
console.log('🔍 Hasher Duplicate Remover');
console.log('━'.repeat(42));
console.log(`Redis: ${REDIS_HOST}:${REDIS_PORT}`);
console.log(`Mode: ${parsed.dryRun ? 'DRY RUN' : 'EXECUTE'}`);
console.log(`Batch size: ${parsed.batchSize}`);
console.log(`Fields to check: ${fieldsToCheck.join(', ')}`);
console.log('');
try {
// Test connection
console.log('🔗 Connecting to Elasticsearch...');
await client.cluster.health({});
console.log('🔗 Connecting to Redis...');
await client.ping();
console.log('✅ Connected successfully\n');
// Get index stats
const countResponse = await client.count({ index: INDEX_NAME });
console.log(`📊 Total documents in index: ${countResponse.count}\n`);
const allDuplicates: DuplicateGroup[] = [];
const seenDeleteIds = new Set<string>();
// Find duplicates for each field
for (const field of fields) {
console.log(`🔍 Checking duplicates for field: ${field}...`);
const fieldDuplicates = await findDuplicatesForField(client, field, parsedArgs.batchSize);
// Filter out already seen delete IDs to avoid counting the same document multiple times
for (const dup of fieldDuplicates) {
const newDeleteIds = dup.deleteIds.filter(id => !seenDeleteIds.has(id));
if (newDeleteIds.length > 0) {
dup.deleteIds = newDeleteIds;
newDeleteIds.forEach(id => seenDeleteIds.add(id));
allDuplicates.push(dup);
}
}
console.log(` Found ${fieldDuplicates.length} duplicate groups for ${field}`);
for (const field of fieldsToCheck) {
const duplicates = await findDuplicatesForField(client, field, parsed.batchSize);
allDuplicates.push(...duplicates);
console.log(` Found ${duplicates.length} duplicate groups for ${field}`);
}
const totalToDelete = allDuplicates.reduce((sum, dup) => sum + dup.deleteIds.length, 0);
console.log(`\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`📋 Summary:`);
console.log(` Duplicate groups found: ${allDuplicates.length}`);
console.log(` Documents to delete: ${totalToDelete}`);
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n`);
console.log('');
console.log(`📊 Total duplicate groups found: ${allDuplicates.length}`);
if (allDuplicates.length === 0) {
console.log(' No duplicates found! Index is clean.\n');
return;
}
console.log(' No duplicates found!');
} else {
const totalToDelete = allDuplicates.reduce(
(sum, dup) => sum + dup.deletePlaintexts.length,
0
);
console.log(` Total documents to delete: ${totalToDelete}`);
// Show sample of duplicates
console.log(`📝 Sample duplicates (showing first 10):\n`);
const samplesToShow = allDuplicates.slice(0, 10);
for (const dup of samplesToShow) {
const truncatedValue = dup.value.length > 50
? dup.value.substring(0, 50) + '...'
: dup.value;
console.log(` Field: ${dup.field}`);
console.log(` Value: ${truncatedValue}`);
console.log(` Keep: ${dup.keepId}`);
console.log(` Delete: ${dup.deleteIds.length} document(s)`);
console.log('');
}
const { deleted, errors } = await removeDuplicates(client, allDuplicates, parsed.dryRun);
if (allDuplicates.length > 10) {
console.log(` ... and ${allDuplicates.length - 10} more duplicate groups\n`);
}
if (parsedArgs.dryRun) {
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`🔎 DRY RUN - No changes made`);
console.log(` Run with --execute to remove ${totalToDelete} duplicate documents`);
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n`);
return;
}
// Execute deletion
console.log(`\n🗑 Removing ${totalToDelete} duplicate documents...\n`);
let deleted = 0;
let errors = 0;
const deleteIds = allDuplicates.flatMap(dup => dup.deleteIds);
// Delete in batches
for (let i = 0; i < deleteIds.length; i += parsedArgs.batchSize) {
const batch = deleteIds.slice(i, i + parsedArgs.batchSize);
try {
const bulkOperations = batch.flatMap(id => [
{ delete: { _index: INDEX_NAME, _id: id } }
]);
const bulkResponse = await client.bulk({
operations: bulkOperations,
refresh: false
});
if (bulkResponse.errors) {
const errorCount = bulkResponse.items.filter((item: any) => item.delete?.error).length;
errors += errorCount;
deleted += batch.length - errorCount;
} else {
deleted += batch.length;
}
process.stdout.write(`\r⏳ Progress: ${Math.min(i + parsedArgs.batchSize, deleteIds.length)}/${deleteIds.length} - Deleted: ${deleted}, Errors: ${errors}`);
} catch (error) {
console.error(`\n❌ Error deleting batch:`, error);
errors += batch.length;
if (!parsed.dryRun) {
console.log('━'.repeat(42));
console.log('✅ Removal complete!');
console.log('');
console.log('📊 Statistics:');
console.log(` Deleted: ${deleted}`);
console.log(` Errors: ${errors}`);
} else {
console.log('━'.repeat(42));
console.log('💡 This was a dry run. Use --execute to actually remove duplicates.');
}
}
// Refresh index
console.log('\n\n🔄 Refreshing index...');
await client.indices.refresh({ index: INDEX_NAME });
// Get new count
const newCountResponse = await client.count({ index: INDEX_NAME });
console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━');
console.log('✅ Duplicate removal complete!');
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Documents deleted: ${deleted}`);
console.log(`Errors: ${errors}`);
console.log(`Previous document count: ${countResponse.count}`);
console.log(`New document count: ${newCountResponse.count}`);
console.log('');
await client.quit();
} catch (error) {
console.error('\n❌ Error:', error instanceof Error ? error.message : error);
console.error('\n\n❌ Error:', error);
await client.quit();
process.exit(1);
}
}
// Parse command line arguments
const args = process.argv.slice(2);
const parsedArgs = parseArgs(args);
if (parsedArgs.showHelp) {
showHelp();
}
// Validate field if provided
const validFields = ['plaintext', 'md5', 'sha1', 'sha256', 'sha512'];
if (parsedArgs.field && !validFields.includes(parsedArgs.field)) {
console.error(`❌ Invalid field: ${parsedArgs.field}`);
console.error(` Valid fields: ${validFields.join(', ')}`);
process.exit(1);
}
console.log(`\n🔧 Configuration:`);
console.log(` Mode: ${parsedArgs.dryRun ? 'dry-run' : 'execute'}`);
console.log(` Batch size: ${parsedArgs.batchSize}`);
if (parsedArgs.field) {
console.log(` Field: ${parsedArgs.field}`);
} else {
console.log(` Fields: all (plaintext, md5, sha1, sha256, sha512)`);
}
console.log('');
removeDuplicates(parsedArgs).catch(console.error);
main();