diff --git a/scripts/index-file.ts b/scripts/index-file.ts index a094e84..6d9a018 100644 --- a/scripts/index-file.ts +++ b/scripts/index-file.ts @@ -7,18 +7,20 @@ * all the generated hashes into Elasticsearch. * * Usage: - * npm run index-file - * or - * node scripts/index-file.js + * npx tsx scripts/index-file.ts [options] + * npm run index-file -- [options] * * Options: - * --batch-size Number of items to process in each batch (default: 100) - * --help Show this help message + * --batch-size= Number of items to process in each batch (default: 100) + * --resume Resume from last saved state (default: true) + * --no-resume Start from beginning, ignore saved state + * --state-file= Custom state file path (default: .indexer-state-.json) + * --help, -h Show this help message */ import { Client } from '@elastic/elasticsearch'; -import { createReadStream } from 'fs'; -import { resolve } from 'path'; +import { createReadStream, existsSync, readFileSync, writeFileSync, unlinkSync } from 'fs'; +import { resolve, basename } from 'path'; import { createInterface } from 'readline'; import crypto from 'crypto'; @@ -36,6 +38,120 @@ interface HashDocument { created_at: string; } +interface IndexerState { + filePath: string; + fileHash: string; + lastProcessedLine: number; + totalLines: number; + indexed: number; + skipped: number; + errors: number; + startTime: number; + lastUpdate: string; +} + +interface ParsedArgs { + filePath: string | null; + batchSize: number; + resume: boolean; + stateFile: string | null; + showHelp: boolean; +} + +function parseArgs(args: string[]): ParsedArgs { + const result: ParsedArgs = { + filePath: null, + batchSize: DEFAULT_BATCH_SIZE, + resume: true, + stateFile: null, + showHelp: false + }; + + for (let i = 0; i < args.length; i++) { + const arg = args[i]; + + if (arg === '--help' || arg === '-h') { + result.showHelp = true; + } else if (arg === '--resume') { + result.resume = true; + } else if (arg === '--no-resume') { + result.resume = false; + } else if (arg.startsWith('--batch-size=')) { + const value = arg.split('=')[1]; + const parsed = parseInt(value, 10); + if (!isNaN(parsed) && parsed > 0) { + result.batchSize = parsed; + } + } else if (arg === '--batch-size') { + // Support --batch-size format + const nextArg = args[i + 1]; + if (nextArg && !nextArg.startsWith('-')) { + const parsed = parseInt(nextArg, 10); + if (!isNaN(parsed) && parsed > 0) { + result.batchSize = parsed; + i++; // Skip next argument + } + } + } else if (arg.startsWith('--state-file=')) { + result.stateFile = arg.split('=')[1]; + } else if (arg === '--state-file') { + const nextArg = args[i + 1]; + if (nextArg && !nextArg.startsWith('-')) { + result.stateFile = nextArg; + i++; + } + } else if (!arg.startsWith('-')) { + // Positional argument - treat as file path + result.filePath = arg; + } + } + + return result; +} + +function getFileHash(filePath: string): string { + // Create a hash based on file path and size for quick identification + const stats = require('fs').statSync(filePath); + const hashInput = `${filePath}:${stats.size}:${stats.mtime.getTime()}`; + return crypto.createHash('md5').update(hashInput).digest('hex').substring(0, 8); +} + +function getDefaultStateFile(filePath: string): string { + const fileName = basename(filePath).replace(/\.[^.]+$/, ''); + return resolve(`.indexer-state-${fileName}.json`); +} + +function loadState(stateFile: string): IndexerState | null { + try { + if (existsSync(stateFile)) { + const data = readFileSync(stateFile, 'utf-8'); + return JSON.parse(data) as IndexerState; + } + } catch (error) { + console.warn(`⚠️ Could not load state file: ${error}`); + } + return null; +} + +function saveState(stateFile: string, state: IndexerState): void { + try { + state.lastUpdate = new Date().toISOString(); + writeFileSync(stateFile, JSON.stringify(state, null, 2), 'utf-8'); + } catch (error) { + console.error(`❌ Could not save state file: ${error}`); + } +} + +function deleteState(stateFile: string): void { + try { + if (existsSync(stateFile)) { + unlinkSync(stateFile); + } + } catch (error) { + console.warn(`⚠️ Could not delete state file: ${error}`); + } +} + async function generateHashes(plaintext: string): Promise { const bcrypt = await import('bcrypt'); const bcryptHash = await bcrypt.default.hash(plaintext, 10); @@ -56,25 +172,71 @@ function showHelp() { Hasher Indexer Script Usage: - npm run index-file - node scripts/index-file.js + npx tsx scripts/index-file.ts [options] + npm run index-file -- [options] Options: - --batch-size Number of items to process in each batch (default: 100) - --help Show this help message + --batch-size= Number of items to process in each batch (default: 100) + --batch-size Alternative syntax for batch size + --resume Resume from last saved state (default) + --no-resume Start from beginning, ignore saved state + --state-file= Custom state file path + --help, -h Show this help message Environment Variables: ELASTICSEARCH_NODE Elasticsearch node URL (default: http://localhost:9200) -Example: - npm run index-file wordlist.txt - npm run index-file wordlist.txt -- --batch-size 500 +Examples: + npx tsx scripts/index-file.ts wordlist.txt + npx tsx scripts/index-file.ts wordlist.txt --batch-size=500 + npx tsx scripts/index-file.ts wordlist.txt --batch-size 500 + npx tsx scripts/index-file.ts wordlist.txt --no-resume + npm run index-file -- wordlist.txt --batch-size=500 + +State Management: + The script automatically saves progress to a state file. If interrupted, + it will resume from where it left off on the next run. Use --no-resume + to start fresh. `); process.exit(0); } -async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZE) { +async function indexFile(filePath: string, batchSize: number, shouldResume: boolean, customStateFile: string | null) { const client = new Client({ node: ELASTICSEARCH_NODE }); + const absolutePath = resolve(filePath); + const stateFile = customStateFile || getDefaultStateFile(absolutePath); + const fileHash = getFileHash(absolutePath); + + // State management + let state: IndexerState = { + filePath: absolutePath, + fileHash, + lastProcessedLine: 0, + totalLines: 0, + indexed: 0, + skipped: 0, + errors: 0, + startTime: Date.now(), + lastUpdate: new Date().toISOString() + }; + + // Check for existing state + const existingState = loadState(stateFile); + let resumingFrom = 0; + + if (shouldResume && existingState) { + if (existingState.fileHash === fileHash) { + state = existingState; + resumingFrom = state.lastProcessedLine; + state.startTime = Date.now(); // Reset start time for this session + console.log(`📂 Found existing state, resuming from line ${resumingFrom}`); + } else { + console.log(`⚠️ File has changed since last run, starting fresh`); + deleteState(stateFile); + } + } else if (!shouldResume) { + deleteState(stateFile); + } console.log(`📚 Hasher Indexer`); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); @@ -82,8 +244,33 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ console.log(`Index: ${INDEX_NAME}`); console.log(`File: ${filePath}`); console.log(`Batch size: ${batchSize}`); + console.log(`State file: ${stateFile}`); + if (resumingFrom > 0) { + console.log(`Resuming from: line ${resumingFrom}`); + console.log(`Already indexed: ${state.indexed}`); + console.log(`Already skipped: ${state.skipped}`); + } console.log(''); + // Handle interruption signals + let isInterrupted = false; + const handleInterrupt = () => { + if (isInterrupted) { + console.log('\n\n⚡ Force quit. Progress saved.'); + process.exit(1); + } + isInterrupted = true; + console.log('\n\n⏸️ Interrupted! Saving state...'); + saveState(stateFile, state); + console.log(`💾 State saved to ${stateFile}`); + console.log(` Resume with: npx tsx scripts/index-file.ts ${filePath}`); + console.log(` Or start fresh with: npx tsx scripts/index-file.ts ${filePath} --no-resume`); + process.exit(0); + }; + + process.on('SIGINT', handleInterrupt); + process.on('SIGTERM', handleInterrupt); + try { // Test connection console.log('🔗 Connecting to Elasticsearch...'); @@ -92,15 +279,13 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ // Process file line by line using streams console.log('📖 Processing file...\n'); - const absolutePath = resolve(filePath); - - let totalLines = 0; - let indexed = 0; - let skipped = 0; - let errors = 0; - const startTime = Date.now(); + let currentLineNumber = 0; let currentBatch: string[] = []; + let sessionIndexed = 0; + let sessionSkipped = 0; + let sessionErrors = 0; + const sessionStartTime = Date.now(); const fileStream = createReadStream(absolutePath, { encoding: 'utf-8' }); const rl = createInterface({ @@ -108,8 +293,9 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ crlfDelay: Infinity }); - const processBatch = async (batch: string[]) => { + const processBatch = async (batch: string[], lineNumber: number) => { if (batch.length === 0) return; + if (isInterrupted) return; const bulkOperations: any[] = []; @@ -157,6 +343,7 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ }); // Prepare bulk operations only for items that don't have any duplicate hash + let batchSkipped = 0; for (const item of batchWithHashes) { const isDuplicate = existingHashes.has(item.plaintext) || @@ -169,7 +356,9 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ bulkOperations.push({ index: { _index: INDEX_NAME } }); bulkOperations.push(item.hashes); } else { - skipped++; + batchSkipped++; + state.skipped++; + sessionSkipped++; } } @@ -183,32 +372,57 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ if (bulkResponse.errors) { const errorCount = bulkResponse.items.filter((item: any) => item.index?.error).length; - errors += errorCount; - indexed += (bulkOperations.length / 2) - errorCount; + state.errors += errorCount; + sessionErrors += errorCount; + const successCount = (bulkOperations.length / 2) - errorCount; + state.indexed += successCount; + sessionIndexed += successCount; } else { - indexed += bulkOperations.length / 2; + const count = bulkOperations.length / 2; + state.indexed += count; + sessionIndexed += count; } } catch (error) { console.error(`\n❌ Error processing batch:`, error); - errors += bulkOperations.length / 2; + const count = bulkOperations.length / 2; + state.errors += count; + sessionErrors += count; } } + // Update state + state.lastProcessedLine = lineNumber; + state.totalLines = lineNumber; + + // Save state periodically (every 10 batches) + if (lineNumber % (batchSize * 10) === 0) { + saveState(stateFile, state); + } + // Progress indicator - process.stdout.write(`\r⏳ Processed: ${totalLines} - Indexed: ${indexed}, Skipped: ${skipped}, Errors: ${errors}`); + const elapsed = ((Date.now() - sessionStartTime) / 1000).toFixed(0); + process.stdout.write(`\r⏳ Line: ${lineNumber} | Session: +${sessionIndexed} indexed, +${sessionSkipped} skipped | Total: ${state.indexed} indexed | Time: ${elapsed}s`); }; for await (const line of rl) { + if (isInterrupted) break; + + currentLineNumber++; + + // Skip already processed lines + if (currentLineNumber <= resumingFrom) { + continue; + } + const trimmedLine = line.trim(); if (trimmedLine.length > 0) { // Only take first word (no spaces or separators) const firstWord = trimmedLine.split(/\s+/)[0]; if (firstWord) { - totalLines++; currentBatch.push(firstWord); if (currentBatch.length >= batchSize) { - await processBatch(currentBatch); + await processBatch(currentBatch, currentLineNumber); currentBatch = []; } } @@ -216,45 +430,77 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ } // Process remaining items in last batch - if (currentBatch.length > 0) { - await processBatch(currentBatch); + if (currentBatch.length > 0 && !isInterrupted) { + await processBatch(currentBatch, currentLineNumber); + } + + if (isInterrupted) { + return; } // Refresh index console.log('\n\n🔄 Refreshing index...'); await client.indices.refresh({ index: INDEX_NAME }); - const duration = ((Date.now() - startTime) / 1000).toFixed(2); - const rate = (indexed / parseFloat(duration)).toFixed(0); + // Delete state file on successful completion + deleteState(stateFile); + + const duration = ((Date.now() - sessionStartTime) / 1000).toFixed(2); + const rate = sessionIndexed > 0 ? (sessionIndexed / parseFloat(duration)).toFixed(0) : '0'; console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'); console.log('✅ Indexing complete!'); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); - console.log(`Total processed: ${totalLines}`); - console.log(`Successfully indexed: ${indexed}`); - console.log(`Skipped (duplicates): ${skipped}`); - console.log(`Errors: ${errors}`); - console.log(`Duration: ${duration}s`); - console.log(`Rate: ${rate} docs/sec`); + console.log(`Total lines processed: ${currentLineNumber}`); + if (resumingFrom > 0) { + console.log(`Lines skipped (resumed): ${resumingFrom}`); + console.log(`Lines processed this session: ${currentLineNumber - resumingFrom}`); + } + console.log(`Successfully indexed (total): ${state.indexed}`); + console.log(`Successfully indexed (session): ${sessionIndexed}`); + console.log(`Skipped duplicates (total): ${state.skipped}`); + console.log(`Skipped duplicates (session): ${sessionSkipped}`); + console.log(`Errors (total): ${state.errors}`); + console.log(`Session duration: ${duration}s`); + console.log(`Session rate: ${rate} docs/sec`); console.log(''); } catch (error) { - console.error('\n❌ Error:', error instanceof Error ? error.message : error); + // Save state on error + saveState(stateFile, state); + console.error(`\n💾 State saved to ${stateFile}`); + console.error('❌ Error:', error instanceof Error ? error.message : error); process.exit(1); + } finally { + // Remove signal handlers + process.removeListener('SIGINT', handleInterrupt); + process.removeListener('SIGTERM', handleInterrupt); } } // Parse command line arguments const args = process.argv.slice(2); +const parsedArgs = parseArgs(args); -if (args.length === 0 || args.includes('--help') || args.includes('-h')) { +if (parsedArgs.showHelp || !parsedArgs.filePath) { showHelp(); } -const filePath = args[0]; -const batchSizeIndex = args.indexOf('--batch-size'); -const batchSize = batchSizeIndex !== -1 && args[batchSizeIndex + 1] - ? parseInt(args[batchSizeIndex + 1], 10) - : DEFAULT_BATCH_SIZE; +const filePath = parsedArgs.filePath as string; -indexFile(filePath, batchSize).catch(console.error); +// Validate file exists +if (!existsSync(filePath)) { + console.error(`❌ File not found: ${filePath}`); + process.exit(1); +} + +console.log(`\n🔧 Configuration:`); +console.log(` File: ${filePath}`); +console.log(` Batch size: ${parsedArgs.batchSize}`); +console.log(` Resume: ${parsedArgs.resume}`); +if (parsedArgs.stateFile) { + console.log(` State file: ${parsedArgs.stateFile}`); +} +console.log(''); + +indexFile(filePath, parsedArgs.batchSize, parsedArgs.resume, parsedArgs.stateFile).catch(console.error);