refactor load script

Signed-off-by: ale <ale@manalejandro.com>
Este commit está contenido en:
ale
2025-12-05 14:21:50 +01:00
padre 0bae36b446
commit ee2aaccffe

Ver fichero

@@ -7,18 +7,20 @@
* all the generated hashes into Elasticsearch.
*
* Usage:
* npm run index-file <path-to-file.txt>
* or
* node scripts/index-file.js <path-to-file.txt>
* npx tsx scripts/index-file.ts <path-to-file.txt> [options]
* npm run index-file -- <path-to-file.txt> [options]
*
* Options:
* --batch-size <number> Number of items to process in each batch (default: 100)
* --help Show this help message
* --batch-size=<number> Number of items to process in each batch (default: 100)
* --resume Resume from last saved state (default: true)
* --no-resume Start from beginning, ignore saved state
* --state-file=<path> Custom state file path (default: .indexer-state-<filename>.json)
* --help, -h Show this help message
*/
import { Client } from '@elastic/elasticsearch';
import { createReadStream } from 'fs';
import { resolve } from 'path';
import { createReadStream, existsSync, readFileSync, writeFileSync, unlinkSync } from 'fs';
import { resolve, basename } from 'path';
import { createInterface } from 'readline';
import crypto from 'crypto';
@@ -36,6 +38,120 @@ interface HashDocument {
created_at: string;
}
interface IndexerState {
filePath: string;
fileHash: string;
lastProcessedLine: number;
totalLines: number;
indexed: number;
skipped: number;
errors: number;
startTime: number;
lastUpdate: string;
}
interface ParsedArgs {
filePath: string | null;
batchSize: number;
resume: boolean;
stateFile: string | null;
showHelp: boolean;
}
function parseArgs(args: string[]): ParsedArgs {
const result: ParsedArgs = {
filePath: null,
batchSize: DEFAULT_BATCH_SIZE,
resume: true,
stateFile: null,
showHelp: false
};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg === '--help' || arg === '-h') {
result.showHelp = true;
} else if (arg === '--resume') {
result.resume = true;
} else if (arg === '--no-resume') {
result.resume = false;
} else if (arg.startsWith('--batch-size=')) {
const value = arg.split('=')[1];
const parsed = parseInt(value, 10);
if (!isNaN(parsed) && parsed > 0) {
result.batchSize = parsed;
}
} else if (arg === '--batch-size') {
// Support --batch-size <value> format
const nextArg = args[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
const parsed = parseInt(nextArg, 10);
if (!isNaN(parsed) && parsed > 0) {
result.batchSize = parsed;
i++; // Skip next argument
}
}
} else if (arg.startsWith('--state-file=')) {
result.stateFile = arg.split('=')[1];
} else if (arg === '--state-file') {
const nextArg = args[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
result.stateFile = nextArg;
i++;
}
} else if (!arg.startsWith('-')) {
// Positional argument - treat as file path
result.filePath = arg;
}
}
return result;
}
function getFileHash(filePath: string): string {
// Create a hash based on file path and size for quick identification
const stats = require('fs').statSync(filePath);
const hashInput = `${filePath}:${stats.size}:${stats.mtime.getTime()}`;
return crypto.createHash('md5').update(hashInput).digest('hex').substring(0, 8);
}
function getDefaultStateFile(filePath: string): string {
const fileName = basename(filePath).replace(/\.[^.]+$/, '');
return resolve(`.indexer-state-${fileName}.json`);
}
function loadState(stateFile: string): IndexerState | null {
try {
if (existsSync(stateFile)) {
const data = readFileSync(stateFile, 'utf-8');
return JSON.parse(data) as IndexerState;
}
} catch (error) {
console.warn(`⚠️ Could not load state file: ${error}`);
}
return null;
}
function saveState(stateFile: string, state: IndexerState): void {
try {
state.lastUpdate = new Date().toISOString();
writeFileSync(stateFile, JSON.stringify(state, null, 2), 'utf-8');
} catch (error) {
console.error(`❌ Could not save state file: ${error}`);
}
}
function deleteState(stateFile: string): void {
try {
if (existsSync(stateFile)) {
unlinkSync(stateFile);
}
} catch (error) {
console.warn(`⚠️ Could not delete state file: ${error}`);
}
}
async function generateHashes(plaintext: string): Promise<HashDocument> {
const bcrypt = await import('bcrypt');
const bcryptHash = await bcrypt.default.hash(plaintext, 10);
@@ -56,25 +172,71 @@ function showHelp() {
Hasher Indexer Script
Usage:
npm run index-file <path-to-file.txt>
node scripts/index-file.js <path-to-file.txt>
npx tsx scripts/index-file.ts <path-to-file.txt> [options]
npm run index-file -- <path-to-file.txt> [options]
Options:
--batch-size <number> Number of items to process in each batch (default: 100)
--help Show this help message
--batch-size=<number> Number of items to process in each batch (default: 100)
--batch-size <number> Alternative syntax for batch size
--resume Resume from last saved state (default)
--no-resume Start from beginning, ignore saved state
--state-file=<path> Custom state file path
--help, -h Show this help message
Environment Variables:
ELASTICSEARCH_NODE Elasticsearch node URL (default: http://localhost:9200)
Example:
npm run index-file wordlist.txt
npm run index-file wordlist.txt -- --batch-size 500
Examples:
npx tsx scripts/index-file.ts wordlist.txt
npx tsx scripts/index-file.ts wordlist.txt --batch-size=500
npx tsx scripts/index-file.ts wordlist.txt --batch-size 500
npx tsx scripts/index-file.ts wordlist.txt --no-resume
npm run index-file -- wordlist.txt --batch-size=500
State Management:
The script automatically saves progress to a state file. If interrupted,
it will resume from where it left off on the next run. Use --no-resume
to start fresh.
`);
process.exit(0);
}
async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZE) {
async function indexFile(filePath: string, batchSize: number, shouldResume: boolean, customStateFile: string | null) {
const client = new Client({ node: ELASTICSEARCH_NODE });
const absolutePath = resolve(filePath);
const stateFile = customStateFile || getDefaultStateFile(absolutePath);
const fileHash = getFileHash(absolutePath);
// State management
let state: IndexerState = {
filePath: absolutePath,
fileHash,
lastProcessedLine: 0,
totalLines: 0,
indexed: 0,
skipped: 0,
errors: 0,
startTime: Date.now(),
lastUpdate: new Date().toISOString()
};
// Check for existing state
const existingState = loadState(stateFile);
let resumingFrom = 0;
if (shouldResume && existingState) {
if (existingState.fileHash === fileHash) {
state = existingState;
resumingFrom = state.lastProcessedLine;
state.startTime = Date.now(); // Reset start time for this session
console.log(`📂 Found existing state, resuming from line ${resumingFrom}`);
} else {
console.log(`⚠️ File has changed since last run, starting fresh`);
deleteState(stateFile);
}
} else if (!shouldResume) {
deleteState(stateFile);
}
console.log(`📚 Hasher Indexer`);
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
@@ -82,8 +244,33 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
console.log(`Index: ${INDEX_NAME}`);
console.log(`File: ${filePath}`);
console.log(`Batch size: ${batchSize}`);
console.log(`State file: ${stateFile}`);
if (resumingFrom > 0) {
console.log(`Resuming from: line ${resumingFrom}`);
console.log(`Already indexed: ${state.indexed}`);
console.log(`Already skipped: ${state.skipped}`);
}
console.log('');
// Handle interruption signals
let isInterrupted = false;
const handleInterrupt = () => {
if (isInterrupted) {
console.log('\n\n⚡ Force quit. Progress saved.');
process.exit(1);
}
isInterrupted = true;
console.log('\n\n⏸ Interrupted! Saving state...');
saveState(stateFile, state);
console.log(`💾 State saved to ${stateFile}`);
console.log(` Resume with: npx tsx scripts/index-file.ts ${filePath}`);
console.log(` Or start fresh with: npx tsx scripts/index-file.ts ${filePath} --no-resume`);
process.exit(0);
};
process.on('SIGINT', handleInterrupt);
process.on('SIGTERM', handleInterrupt);
try {
// Test connection
console.log('🔗 Connecting to Elasticsearch...');
@@ -92,15 +279,13 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
// Process file line by line using streams
console.log('📖 Processing file...\n');
const absolutePath = resolve(filePath);
let totalLines = 0;
let indexed = 0;
let skipped = 0;
let errors = 0;
const startTime = Date.now();
let currentLineNumber = 0;
let currentBatch: string[] = [];
let sessionIndexed = 0;
let sessionSkipped = 0;
let sessionErrors = 0;
const sessionStartTime = Date.now();
const fileStream = createReadStream(absolutePath, { encoding: 'utf-8' });
const rl = createInterface({
@@ -108,8 +293,9 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
crlfDelay: Infinity
});
const processBatch = async (batch: string[]) => {
const processBatch = async (batch: string[], lineNumber: number) => {
if (batch.length === 0) return;
if (isInterrupted) return;
const bulkOperations: any[] = [];
@@ -157,6 +343,7 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
});
// Prepare bulk operations only for items that don't have any duplicate hash
let batchSkipped = 0;
for (const item of batchWithHashes) {
const isDuplicate =
existingHashes.has(item.plaintext) ||
@@ -169,7 +356,9 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
bulkOperations.push({ index: { _index: INDEX_NAME } });
bulkOperations.push(item.hashes);
} else {
skipped++;
batchSkipped++;
state.skipped++;
sessionSkipped++;
}
}
@@ -183,32 +372,57 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
if (bulkResponse.errors) {
const errorCount = bulkResponse.items.filter((item: any) => item.index?.error).length;
errors += errorCount;
indexed += (bulkOperations.length / 2) - errorCount;
state.errors += errorCount;
sessionErrors += errorCount;
const successCount = (bulkOperations.length / 2) - errorCount;
state.indexed += successCount;
sessionIndexed += successCount;
} else {
indexed += bulkOperations.length / 2;
const count = bulkOperations.length / 2;
state.indexed += count;
sessionIndexed += count;
}
} catch (error) {
console.error(`\n❌ Error processing batch:`, error);
errors += bulkOperations.length / 2;
const count = bulkOperations.length / 2;
state.errors += count;
sessionErrors += count;
}
}
// Update state
state.lastProcessedLine = lineNumber;
state.totalLines = lineNumber;
// Save state periodically (every 10 batches)
if (lineNumber % (batchSize * 10) === 0) {
saveState(stateFile, state);
}
// Progress indicator
process.stdout.write(`\r⏳ Processed: ${totalLines} - Indexed: ${indexed}, Skipped: ${skipped}, Errors: ${errors}`);
const elapsed = ((Date.now() - sessionStartTime) / 1000).toFixed(0);
process.stdout.write(`\r⏳ Line: ${lineNumber} | Session: +${sessionIndexed} indexed, +${sessionSkipped} skipped | Total: ${state.indexed} indexed | Time: ${elapsed}s`);
};
for await (const line of rl) {
if (isInterrupted) break;
currentLineNumber++;
// Skip already processed lines
if (currentLineNumber <= resumingFrom) {
continue;
}
const trimmedLine = line.trim();
if (trimmedLine.length > 0) {
// Only take first word (no spaces or separators)
const firstWord = trimmedLine.split(/\s+/)[0];
if (firstWord) {
totalLines++;
currentBatch.push(firstWord);
if (currentBatch.length >= batchSize) {
await processBatch(currentBatch);
await processBatch(currentBatch, currentLineNumber);
currentBatch = [];
}
}
@@ -216,45 +430,77 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
}
// Process remaining items in last batch
if (currentBatch.length > 0) {
await processBatch(currentBatch);
if (currentBatch.length > 0 && !isInterrupted) {
await processBatch(currentBatch, currentLineNumber);
}
if (isInterrupted) {
return;
}
// Refresh index
console.log('\n\n🔄 Refreshing index...');
await client.indices.refresh({ index: INDEX_NAME });
const duration = ((Date.now() - startTime) / 1000).toFixed(2);
const rate = (indexed / parseFloat(duration)).toFixed(0);
// Delete state file on successful completion
deleteState(stateFile);
const duration = ((Date.now() - sessionStartTime) / 1000).toFixed(2);
const rate = sessionIndexed > 0 ? (sessionIndexed / parseFloat(duration)).toFixed(0) : '0';
console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━');
console.log('✅ Indexing complete!');
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Total processed: ${totalLines}`);
console.log(`Successfully indexed: ${indexed}`);
console.log(`Skipped (duplicates): ${skipped}`);
console.log(`Errors: ${errors}`);
console.log(`Duration: ${duration}s`);
console.log(`Rate: ${rate} docs/sec`);
console.log(`Total lines processed: ${currentLineNumber}`);
if (resumingFrom > 0) {
console.log(`Lines skipped (resumed): ${resumingFrom}`);
console.log(`Lines processed this session: ${currentLineNumber - resumingFrom}`);
}
console.log(`Successfully indexed (total): ${state.indexed}`);
console.log(`Successfully indexed (session): ${sessionIndexed}`);
console.log(`Skipped duplicates (total): ${state.skipped}`);
console.log(`Skipped duplicates (session): ${sessionSkipped}`);
console.log(`Errors (total): ${state.errors}`);
console.log(`Session duration: ${duration}s`);
console.log(`Session rate: ${rate} docs/sec`);
console.log('');
} catch (error) {
console.error('\n❌ Error:', error instanceof Error ? error.message : error);
// Save state on error
saveState(stateFile, state);
console.error(`\n💾 State saved to ${stateFile}`);
console.error('❌ Error:', error instanceof Error ? error.message : error);
process.exit(1);
} finally {
// Remove signal handlers
process.removeListener('SIGINT', handleInterrupt);
process.removeListener('SIGTERM', handleInterrupt);
}
}
// Parse command line arguments
const args = process.argv.slice(2);
const parsedArgs = parseArgs(args);
if (args.length === 0 || args.includes('--help') || args.includes('-h')) {
if (parsedArgs.showHelp || !parsedArgs.filePath) {
showHelp();
}
const filePath = args[0];
const batchSizeIndex = args.indexOf('--batch-size');
const batchSize = batchSizeIndex !== -1 && args[batchSizeIndex + 1]
? parseInt(args[batchSizeIndex + 1], 10)
: DEFAULT_BATCH_SIZE;
const filePath = parsedArgs.filePath as string;
indexFile(filePath, batchSize).catch(console.error);
// Validate file exists
if (!existsSync(filePath)) {
console.error(`❌ File not found: ${filePath}`);
process.exit(1);
}
console.log(`\n🔧 Configuration:`);
console.log(` File: ${filePath}`);
console.log(` Batch size: ${parsedArgs.batchSize}`);
console.log(` Resume: ${parsedArgs.resume}`);
if (parsedArgs.stateFile) {
console.log(` State file: ${parsedArgs.stateFile}`);
}
console.log('');
indexFile(filePath, parsedArgs.batchSize, parsedArgs.resume, parsedArgs.stateFile).catch(console.error);