#!/usr/bin/env node /** * Network Packet Capture and Elasticsearch Indexer * Captures network packets and indexes them to Elasticsearch */ const Cap = require('cap').Cap; const decoders = require('cap').decoders; const PROTOCOL = decoders.PROTOCOL; const { Client } = require('@elastic/elasticsearch'); const os = require('os'); const config = require('./config'); // Initialize Elasticsearch client const esClient = new Client({ node: config.elasticsearch.node, auth: config.elasticsearch.auth }); // Memory cache for failed indexing const documentCache = []; let maxCacheSize = config.cache.maxSize; let esAvailable = true; let lastESCheckTime = Date.now(); const ES_CHECK_INTERVAL = config.cache.checkInterval; // Statistics tracking const stats = { packetsProcessed: 0, packetsIndexed: 0, packetsSkipped: 0, contentSkipped: 0, cachedDocuments: 0, cacheOverflows: 0, errors: 0, startTime: Date.now() }; /** * Logger utility */ const logger = { debug: (...args) => config.logging.level === 'debug' && console.log('[DEBUG]', ...args), info: (...args) => ['debug', 'info'].includes(config.logging.level) && console.log('[INFO]', ...args), warn: (...args) => ['debug', 'info', 'warn'].includes(config.logging.level) && console.warn('[WARN]', ...args), error: (...args) => console.error('[ERROR]', ...args) }; /** * Get network interface information */ function getInterfaceInfo(interfaceName) { const interfaces = os.networkInterfaces(); const iface = interfaces[interfaceName]; if (!iface) return null; // Find IPv4 address const ipv4 = iface.find(addr => addr.family === 'IPv4'); return { name: interfaceName, ip: ipv4 ? ipv4.address : null, mac: ipv4 ? ipv4.mac : null }; } /** * Get all available network interfaces */ function getAvailableInterfaces() { const interfaces = os.networkInterfaces(); return Object.keys(interfaces).filter(name => { const iface = interfaces[name]; // Filter out loopback and interfaces without IPv4 return iface.some(addr => addr.family === 'IPv4' && !addr.internal); }); } /** * Build BPF filter string based on configuration */ function buildBPFFilter() { if (config.capture.filter) { return config.capture.filter; } const filters = []; // Protocol filter if (config.filters.protocols.length > 0) { const protoFilter = config.filters.protocols.map(p => p.toLowerCase()).join(' or '); filters.push(`(${protoFilter})`); } // Port exclusion filter if (config.filters.excludePorts.length > 0) { const portFilters = config.filters.excludePorts.map(port => `not port ${port}` ); filters.push(...portFilters); } // Port range exclusion filter if (config.filters.excludePortRanges.length > 0) { const rangeFilters = config.filters.excludePortRanges.map(([start, end]) => `not portrange ${start}-${end}` ); filters.push(...rangeFilters); } // Port inclusion filter (takes precedence) if (config.filters.includePorts.length > 0) { const includeFilter = config.filters.includePorts.map(port => `port ${port}` ).join(' or '); filters.push(`(${includeFilter})`); } return filters.length > 0 ? filters.join(' and ') : ''; } /** * Check if content is ASCII/readable */ function isReadableContent(buffer) { if (!buffer || buffer.length === 0) return false; let readableChars = 0; const sampleSize = Math.min(buffer.length, 100); // Sample first 100 bytes for (let i = 0; i < sampleSize; i++) { const byte = buffer[i]; // Check for printable ASCII characters and common whitespace if ((byte >= 32 && byte <= 126) || byte === 9 || byte === 10 || byte === 13) { readableChars++; } } // Consider readable if more than 70% are printable characters return (readableChars / sampleSize) > 0.7; } /** * Extract readable content from buffer */ function extractContent(buffer, maxSize) { if (!buffer || buffer.length === 0) { return null; } // Skip if too large if (buffer.length > maxSize) { stats.contentSkipped++; return null; } if (!config.content.indexReadableContent) { return null; } // Check if content is readable if (isReadableContent(buffer)) { try { return buffer.toString('utf8', 0, Math.min(buffer.length, maxSize)); } catch (e) { logger.debug('Failed to convert buffer to string:', e.message); return null; } } return null; } /** * Add document to cache */ function addToCache(document) { if (documentCache.length >= maxCacheSize) { // Remove oldest document if cache is full documentCache.shift(); stats.cacheOverflows++; logger.warn(`Cache overflow: removed oldest document (cache size: ${maxCacheSize})`); } documentCache.push(document); stats.cachedDocuments = documentCache.length; logger.debug(`Document added to cache (total: ${documentCache.length})`); } /** * Try to flush cached documents to Elasticsearch */ async function flushCache() { if (documentCache.length === 0) return; logger.info(`Attempting to flush ${documentCache.length} cached documents...`); const documentsToFlush = [...documentCache]; let flushedCount = 0; for (const document of documentsToFlush) { try { await esClient.index({ index: config.elasticsearch.index, document: document }); // Remove from cache on success const index = documentCache.indexOf(document); if (index > -1) { documentCache.splice(index, 1); } flushedCount++; stats.packetsIndexed++; } catch (error) { logger.debug(`Failed to flush cached document: ${error.message}`); // Stop trying if ES is still unavailable break; } } stats.cachedDocuments = documentCache.length; if (flushedCount > 0) { logger.info(`Successfully flushed ${flushedCount} documents. Remaining in cache: ${documentCache.length}`); } return flushedCount > 0; } /** * Check Elasticsearch availability */ async function checkESAvailability() { try { await esClient.ping(); if (!esAvailable) { logger.info('Elasticsearch connection restored!'); esAvailable = true; // Try to flush cache await flushCache(); } return true; } catch (error) { if (esAvailable) { logger.error('Elasticsearch connection lost!'); esAvailable = false; } return false; } } /** * Index document to Elasticsearch with cache fallback */ async function indexDocument(document) { // First, try to flush cache if we have pending documents if (documentCache.length > 0 && esAvailable) { const now = Date.now(); if (now - lastESCheckTime > ES_CHECK_INTERVAL) { await flushCache(); lastESCheckTime = now; } } try { await esClient.index({ index: config.elasticsearch.index, document: document }); stats.packetsIndexed++; esAvailable = true; logger.debug('Document indexed successfully'); } catch (error) { logger.warn(`Failed to index document: ${error.message}. Adding to cache.`); esAvailable = false; addToCache(document); stats.errors++; } } /** * Parse and index a packet */ async function processPacket(buffer, interfaceInfo) { stats.packetsProcessed++; try { // Decode Ethernet layer const ret = decoders.Ethernet(buffer); if (!ret || !ret.info) { stats.packetsSkipped++; return; } const packet = { '@timestamp': new Date().toISOString(), date: new Date().toISOString(), interface: { name: interfaceInfo.name, ip: interfaceInfo.ip, mac: interfaceInfo.mac }, ethernet: { src: ret.info.srcmac, dst: ret.info.dstmac, type: ret.info.type } }; // Decode IP layer if (ret.info.type === PROTOCOL.ETHERNET.IPV4) { const ipRet = decoders.IPV4(buffer, ret.offset); if (ipRet) { packet.ip = { version: 4, src: ipRet.info.srcaddr, dst: ipRet.info.dstaddr, protocol: ipRet.info.protocol, ttl: ipRet.info.ttl, length: ipRet.info.totallen }; // Decode TCP if (ipRet.info.protocol === PROTOCOL.IP.TCP) { const tcpRet = decoders.TCP(buffer, ipRet.offset); if (tcpRet) { packet.tcp = { src_port: tcpRet.info.srcport, dst_port: tcpRet.info.dstport, flags: { syn: !!(tcpRet.info.flags & 0x02), ack: !!(tcpRet.info.flags & 0x10), fin: !!(tcpRet.info.flags & 0x01), rst: !!(tcpRet.info.flags & 0x04), psh: !!(tcpRet.info.flags & 0x08) }, seq: tcpRet.info.seqno, ack_seq: tcpRet.info.ackno, window: tcpRet.info.window }; // Extract payload if (tcpRet.offset < buffer.length) { const payload = buffer.slice(tcpRet.offset); const content = extractContent(payload, config.content.maxContentSize); if (content) { packet.content = content; packet.content_length = payload.length; } else if (payload.length > 0) { packet.content_length = payload.length; packet.content_type = 'binary'; } } } } // Decode UDP else if (ipRet.info.protocol === PROTOCOL.IP.UDP) { const udpRet = decoders.UDP(buffer, ipRet.offset); if (udpRet) { packet.udp = { src_port: udpRet.info.srcport, dst_port: udpRet.info.dstport, length: udpRet.info.length }; // Extract payload if (udpRet.offset < buffer.length) { const payload = buffer.slice(udpRet.offset); const content = extractContent(payload, config.content.maxContentSize); if (content) { packet.content = content; packet.content_length = payload.length; } else if (payload.length > 0) { packet.content_length = payload.length; packet.content_type = 'binary'; } } } } // Handle ICMP else if (ipRet.info.protocol === PROTOCOL.IP.ICMP) { packet.icmp = { protocol: 'icmp' }; } } } // Handle IPv6 else if (ret.info.type === PROTOCOL.ETHERNET.IPV6) { const ipv6Ret = decoders.IPV6(buffer, ret.offset); if (ipv6Ret) { packet.ip = { version: 6, src: ipv6Ret.info.srcaddr, dst: ipv6Ret.info.dstaddr, protocol: ipv6Ret.info.protocol, hop_limit: ipv6Ret.info.hoplimit }; } } // Index to Elasticsearch (with cache fallback) await indexDocument(packet); } catch (error) { stats.errors++; logger.error('Error processing packet:', error.message); } } /** * Setup packet capture for an interface */ function setupCapture(interfaceName) { const interfaceInfo = getInterfaceInfo(interfaceName); if (!interfaceInfo || !interfaceInfo.ip) { logger.warn(`Interface ${interfaceName} not found or has no IPv4 address`); return null; } const cap = new Cap(); const device = interfaceName; const filter = buildBPFFilter(); const bufferSize = config.capture.bufferSize; try { const linkType = cap.open(device, filter, bufferSize, Buffer.alloc(65535)); logger.info(`Capturing on interface: ${interfaceName} (${interfaceInfo.ip})`); logger.info(`Promiscuous mode: ${config.capture.promiscuousMode ? 'enabled' : 'disabled'}`); if (filter) { logger.info(`BPF filter: ${filter}`); } logger.info(`Link type: ${linkType}`); cap.setMinBytes(0); cap.on('packet', (nbytes, trunc) => { if (linkType === 'ETHERNET') { const buffer = cap.buffer.slice(0, nbytes); processPacket(buffer, interfaceInfo).catch(err => { logger.error('Failed to process packet:', err.message); }); } }); return cap; } catch (error) { logger.error(`Failed to setup capture on ${interfaceName}:`, error.message); return null; } } /** * Initialize Elasticsearch index with mapping */ async function initializeElasticsearch() { try { // Check if index exists const indexExists = await esClient.indices.exists({ index: config.elasticsearch.index }); if (!indexExists) { logger.info(`Creating Elasticsearch index: ${config.elasticsearch.index}`); await esClient.indices.create({ index: config.elasticsearch.index, body: { mappings: { properties: { '@timestamp': { type: 'date' }, date: { type: 'date' }, interface: { properties: { name: { type: 'keyword' }, ip: { type: 'ip' }, mac: { type: 'keyword' } } }, ethernet: { properties: { src: { type: 'keyword' }, dst: { type: 'keyword' }, type: { type: 'integer' } } }, ip: { properties: { version: { type: 'integer' }, src: { type: 'ip' }, dst: { type: 'ip' }, protocol: { type: 'integer' }, ttl: { type: 'integer' }, length: { type: 'integer' }, hop_limit: { type: 'integer' } } }, tcp: { properties: { src_port: { type: 'integer' }, dst_port: { type: 'integer' }, flags: { properties: { syn: { type: 'boolean' }, ack: { type: 'boolean' }, fin: { type: 'boolean' }, rst: { type: 'boolean' }, psh: { type: 'boolean' } } }, seq: { type: 'long' }, ack_seq: { type: 'long' }, window: { type: 'integer' } } }, udp: { properties: { src_port: { type: 'integer' }, dst_port: { type: 'integer' }, length: { type: 'integer' } } }, icmp: { properties: { protocol: { type: 'keyword' } } }, content: { type: 'text' }, content_length: { type: 'integer' }, content_type: { type: 'keyword' } } } } }); logger.info('Elasticsearch index created successfully'); } else { logger.info(`Using existing Elasticsearch index: ${config.elasticsearch.index}`); } } catch (error) { logger.error('Failed to initialize Elasticsearch:', error.message); throw error; } } /** * Print statistics */ function printStats() { const uptime = Math.floor((Date.now() - stats.startTime) / 1000); const rate = uptime > 0 ? (stats.packetsProcessed / uptime).toFixed(2) : 0; logger.info('=== Packet Capture Statistics ==='); logger.info(`Uptime: ${uptime}s`); logger.info(`Packets processed: ${stats.packetsProcessed}`); logger.info(`Packets indexed: ${stats.packetsIndexed}`); logger.info(`Packets skipped: ${stats.packetsSkipped}`); logger.info(`Content skipped (too large): ${stats.contentSkipped}`); logger.info(`Cached documents: ${stats.cachedDocuments}`); logger.info(`Cache overflows: ${stats.cacheOverflows}`); logger.info(`Elasticsearch status: ${esAvailable ? 'connected' : 'disconnected'}`); logger.info(`Errors: ${stats.errors}`); logger.info(`Processing rate: ${rate} packets/sec`); logger.info('================================'); } /** * Main function */ async function main() { logger.info('Network Packet Capture Starting...'); // Check for root/admin privileges if (process.getuid && process.getuid() !== 0) { logger.warn('Warning: Not running as root. Packet capture may fail.'); logger.warn('Consider running with: sudo node index.js'); } // Initialize Elasticsearch try { await initializeElasticsearch(); } catch (error) { logger.error('Failed to initialize Elasticsearch. Exiting.'); process.exit(1); } // Determine interfaces to capture let interfaces = config.capture.interfaces; if (interfaces.length === 0) { interfaces = getAvailableInterfaces(); logger.info(`No interfaces specified. Using all available: ${interfaces.join(', ')}`); } if (interfaces.length === 0) { logger.error('No network interfaces available for capture'); process.exit(1); } // Setup capture on each interface const captures = []; for (const iface of interfaces) { const cap = setupCapture(iface); if (cap) { captures.push(cap); } } if (captures.length === 0) { logger.error('Failed to setup capture on any interface'); process.exit(1); } // Setup statistics reporting setInterval(printStats, config.logging.statsInterval * 1000); // Setup periodic ES availability check and cache flush setInterval(async () => { await checkESAvailability(); }, ES_CHECK_INTERVAL); // Graceful shutdown process.on('SIGINT', async () => { logger.info('\nShutting down...'); // Try to flush remaining cached documents if (documentCache.length > 0) { logger.info(`Attempting to flush ${documentCache.length} cached documents before exit...`); try { await flushCache(); if (documentCache.length > 0) { logger.warn(`Warning: ${documentCache.length} documents remain in cache and will be lost`); } } catch (error) { logger.error('Failed to flush cache on shutdown:', error.message); } } printStats(); process.exit(0); }); process.on('SIGTERM', async () => { logger.info('\nShutting down...'); // Try to flush remaining cached documents if (documentCache.length > 0) { logger.info(`Attempting to flush ${documentCache.length} cached documents before exit...`); try { await flushCache(); if (documentCache.length > 0) { logger.warn(`Warning: ${documentCache.length} documents remain in cache and will be lost`); } } catch (error) { logger.error('Failed to flush cache on shutdown:', error.message); } } printStats(); process.exit(0); }); logger.info('Packet capture running. Press Ctrl+C to stop.'); } // Run the application main().catch(error => { logger.error('Fatal error:', error); process.exit(1); });