index file stream big files

Signed-off-by: ale <ale@manalejandro.com>
Este commit está contenido en:
ale
2025-12-04 01:30:37 +01:00
padre 0b6d2f1436
commit 77b8f6dfc7

Ver fichero

@@ -17,8 +17,9 @@
*/ */
import { Client } from '@elastic/elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { readFileSync } from 'fs'; import { createReadStream } from 'fs';
import { resolve } from 'path'; import { resolve } from 'path';
import { createInterface } from 'readline';
import crypto from 'crypto'; import crypto from 'crypto';
const ELASTICSEARCH_NODE = process.env.ELASTICSEARCH_NODE || 'http://localhost:9200'; const ELASTICSEARCH_NODE = process.env.ELASTICSEARCH_NODE || 'http://localhost:9200';
@@ -89,43 +90,46 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
await client.cluster.health({}); await client.cluster.health({});
console.log('✅ Connected successfully\n'); console.log('✅ Connected successfully\n');
// Read file // Process file line by line using streams
console.log('📖 Reading file...'); console.log('📖 Processing file...\n');
const absolutePath = resolve(filePath); const absolutePath = resolve(filePath);
const content = readFileSync(absolutePath, 'utf-8');
const lines = content.split('\n')
.map(line => line.trim())
.filter(line => line.length > 0);
console.log(`✅ Found ${lines.length} words/phrases to process\n`); let totalLines = 0;
// Process in batches
let indexed = 0; let indexed = 0;
let skipped = 0; let skipped = 0;
let errors = 0; let errors = 0;
const startTime = Date.now(); const startTime = Date.now();
let currentBatch: string[] = [];
const fileStream = createReadStream(absolutePath, { encoding: 'utf-8' });
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
const processBatch = async (batch: string[]) => {
if (batch.length === 0) return;
for (let i = 0; i < lines.length; i += batchSize) {
const batch = lines.slice(i, i + batchSize);
const bulkOperations: any[] = []; const bulkOperations: any[] = [];
// Generate hashes for all items in batch first // Generate hashes for all items in batch first
const batchWithHashes = await Promise.all( const batchWithHashes = await Promise.all(
batch.map(async (plaintext) => ({ batch.map(async (plaintext: string) => ({
plaintext, plaintext,
hashes: await generateHashes(plaintext) hashes: await generateHashes(plaintext)
})) }))
); );
// Check which items already exist (by plaintext or any hash) // Check which items already exist (by plaintext or any hash)
const md5List = batchWithHashes.map(item => item.hashes.md5); const md5List = batchWithHashes.map((item: any) => item.hashes.md5);
const sha1List = batchWithHashes.map(item => item.hashes.sha1); const sha1List = batchWithHashes.map((item: any) => item.hashes.sha1);
const sha256List = batchWithHashes.map(item => item.hashes.sha256); const sha256List = batchWithHashes.map((item: any) => item.hashes.sha256);
const sha512List = batchWithHashes.map(item => item.hashes.sha512); const sha512List = batchWithHashes.map((item: any) => item.hashes.sha512);
const existingCheck = await client.search({ const existingCheck = await client.search({
index: INDEX_NAME, index: INDEX_NAME,
size: batchSize * 5, // Account for potential multiple matches size: batchSize * 5,
query: { query: {
bool: { bool: {
should: [ should: [
@@ -185,15 +189,35 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
indexed += bulkOperations.length / 2; indexed += bulkOperations.length / 2;
} }
} catch (error) { } catch (error) {
console.error(`\n❌ Error processing batch ${i}-${i + batchSize}:`, error); console.error(`\n❌ Error processing batch:`, error);
errors += bulkOperations.length / 2; errors += bulkOperations.length / 2;
} }
} }
// Progress indicator // Progress indicator
const progress = Math.min(i + batchSize, lines.length); process.stdout.write(`\r⏳ Processed: ${totalLines} - Indexed: ${indexed}, Skipped: ${skipped}, Errors: ${errors}`);
const percent = ((progress / lines.length) * 100).toFixed(1); };
process.stdout.write(`\r⏳ Progress: ${progress}/${lines.length} (${percent}%) - Indexed: ${indexed}, Skipped: ${skipped}, Errors: ${errors}`);
for await (const line of rl) {
const trimmedLine = line.trim();
if (trimmedLine.length > 0) {
// Only take first word (no spaces or separators)
const firstWord = trimmedLine.split(/\s+/)[0];
if (firstWord) {
totalLines++;
currentBatch.push(firstWord);
if (currentBatch.length >= batchSize) {
await processBatch(currentBatch);
currentBatch = [];
}
}
}
}
// Process remaining items in last batch
if (currentBatch.length > 0) {
await processBatch(currentBatch);
} }
// Refresh index // Refresh index
@@ -206,7 +230,7 @@ async function indexFile(filePath: string, batchSize: number = DEFAULT_BATCH_SIZ
console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'); console.log('\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━');
console.log('✅ Indexing complete!'); console.log('✅ Indexing complete!');
console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`); console.log(`━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━`);
console.log(`Total processed: ${lines.length}`); console.log(`Total processed: ${totalLines}`);
console.log(`Successfully indexed: ${indexed}`); console.log(`Successfully indexed: ${indexed}`);
console.log(`Skipped (duplicates): ${skipped}`); console.log(`Skipped (duplicates): ${skipped}`);
console.log(`Errors: ${errors}`); console.log(`Errors: ${errors}`);