Files
netpcap/index.js
ale 9e5479dd8c fix geoip
Signed-off-by: ale <ale@manalejandro.com>
2026-02-12 00:36:42 +01:00

937 líneas
26 KiB
JavaScript

#!/usr/bin/env node
/**
* Network Packet Capture and Elasticsearch Indexer
* Captures network packets and indexes them to Elasticsearch
*/
const Cap = require('cap').Cap;
const decoders = require('cap').decoders;
const PROTOCOL = decoders.PROTOCOL;
const { Client } = require('@elastic/elasticsearch');
const os = require('os');
const https = require('https');
const geoip = require('geoip-lite');
const config = require('./config');
// Initialize Elasticsearch client
const esClient = new Client({
node: config.elasticsearch.node,
auth: config.elasticsearch.auth
});
// Memory cache for failed indexing
const documentCache = [];
let maxCacheSize = config.cache.maxSize;
let esAvailable = true;
let lastESCheckTime = Date.now();
const ES_CHECK_INTERVAL = config.cache.checkInterval;
// Public IP detection
let publicIP = null;
let publicIPGeoData = null;
// Statistics tracking
const stats = {
packetsProcessed: 0,
packetsIndexed: 0,
packetsSkipped: 0,
contentSkipped: 0,
cachedDocuments: 0,
cacheOverflows: 0,
errors: 0,
startTime: Date.now()
};
/**
* Logger utility
*/
const logger = {
debug: (...args) => config.logging.level === 'debug' && console.log('[DEBUG]', ...args),
info: (...args) => ['debug', 'info'].includes(config.logging.level) && console.log('[INFO]', ...args),
warn: (...args) => ['debug', 'info', 'warn'].includes(config.logging.level) && console.warn('[WARN]', ...args),
error: (...args) => console.error('[ERROR]', ...args)
};
/**
* Get network interface information
*/
function getInterfaceInfo(interfaceName) {
const interfaces = os.networkInterfaces();
const iface = interfaces[interfaceName];
if (!iface) return null;
// Find IPv4 address
const ipv4 = iface.find(addr => addr.family === 'IPv4');
return {
name: interfaceName,
ip: ipv4 ? ipv4.address : null,
mac: ipv4 ? ipv4.mac : null
};
}
/**
* Get all available network interfaces
*/
function getAvailableInterfaces() {
const interfaces = os.networkInterfaces();
return Object.keys(interfaces).filter(name => {
const iface = interfaces[name];
// Filter out loopback and interfaces without IPv4
return iface.some(addr => addr.family === 'IPv4' && !addr.internal);
});
}
/**
* Build BPF filter string based on configuration
*/
function buildBPFFilter() {
if (config.capture.filter) {
return config.capture.filter;
}
const filters = [];
// Protocol filter
if (config.filters.protocols.length > 0) {
const protoFilter = config.filters.protocols.map(p => p.toLowerCase()).join(' or ');
filters.push(`(${protoFilter})`);
}
// Port exclusion filter
if (config.filters.excludePorts.length > 0) {
const portFilters = config.filters.excludePorts.map(port =>
`not port ${port}`
);
filters.push(...portFilters);
}
// Port range exclusion filter
if (config.filters.excludePortRanges.length > 0) {
const rangeFilters = config.filters.excludePortRanges.map(([start, end]) =>
`not portrange ${start}-${end}`
);
filters.push(...rangeFilters);
}
// Port inclusion filter (takes precedence)
if (config.filters.includePorts.length > 0) {
const includeFilter = config.filters.includePorts.map(port =>
`port ${port}`
).join(' or ');
filters.push(`(${includeFilter})`);
}
return filters.length > 0 ? filters.join(' and ') : '';
}
/**
* Get public IP address from external service
*/
async function getPublicIP() {
return new Promise((resolve, reject) => {
https.get('https://api.ipify.org?format=json', (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const json = JSON.parse(data);
resolve(json.ip);
} catch (error) {
reject(error);
}
});
}).on('error', (error) => {
reject(error);
});
});
}
/**
* Check if IP address is private/local
*/
function isPrivateIP(ip) {
if (!ip) return true;
// IPv4 private ranges
const ipv4PrivateRanges = [
/^10\./, // 10.0.0.0/8
/^172\.(1[6-9]|2[0-9]|3[0-1])\./, // 172.16.0.0/12
/^192\.168\./, // 192.168.0.0/16
/^127\./, // Loopback
/^169\.254\./, // Link-local
/^0\./ // Invalid
];
// Check IPv4 private ranges
if (ipv4PrivateRanges.some(regex => regex.test(ip))) {
return true;
}
// IPv6 local addresses
if (ip.includes(':')) {
if (ip.startsWith('fe80:') || // Link-local
ip.startsWith('fc00:') || // Unique local
ip.startsWith('fd00:') || // Unique local
ip === '::1') { // Loopback
return true;
}
}
return false;
}
/**
* Check if IP is remote (not local, not private)
* Now includes the public IP for geolocation
*/
function isRemoteIP(ip) {
if (!ip || isPrivateIP(ip)) return false;
return true;
}
/**
* Get GeoIP information for an IP address
*/
function getGeoIPData(ip) {
if (!ip || isPrivateIP(ip)) {
return null;
}
// Return cached GeoIP data for our public IP
if (publicIP && ip === publicIP && publicIPGeoData) {
return publicIPGeoData;
}
try {
const geo = geoip.lookup(ip);
if (!geo) return null;
return {
country: geo.country || null,
country_name: geo.country || null,
region: geo.region || null,
city: geo.city || null,
timezone: geo.timezone || null,
location: geo.ll ? {
lat: geo.ll[0],
lon: geo.ll[1]
} : null,
coordinates: geo.ll ? [geo.ll[1], geo.ll[0]] : null // [lon, lat] for Elasticsearch
};
} catch (error) {
logger.debug(`Failed to get GeoIP data for ${ip}:`, error.message);
return null;
}
}
/**
* Validate and format IP address for Elasticsearch
* Returns null if invalid
*/
function validateIPAddress(ipString) {
if (!ipString || typeof ipString !== 'string') return null;
// IPv4 validation regex
const ipv4Regex = /^(\d{1,3}\.){3}\d{1,3}$/;
// IPv6 validation - check if it contains colons and valid hex characters
const ipv6Regex = /^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$/;
// Check if it's a valid IPv4
if (ipv4Regex.test(ipString)) {
const parts = ipString.split('.');
const allValid = parts.every(part => {
const num = parseInt(part, 10);
return num >= 0 && num <= 255;
});
return allValid ? ipString : null;
}
// Check if it's a valid IPv6
if (ipv6Regex.test(ipString)) {
return ipString;
}
// Try to convert MAC-like format to IPv6 (fe:80:... -> fe80::...)
// Some decoders might return IPv6 in an incorrect format
if (ipString.includes(':') && ipString.length > 20) {
// This might be a malformed IPv6, try to detect and skip
logger.debug(`Skipping malformed IP address: ${ipString}`);
return null;
}
return null;
}
/**
* Check if content is ASCII/readable
*/
function isReadableContent(buffer) {
if (!buffer || buffer.length === 0) return false;
let readableChars = 0;
const sampleSize = Math.min(buffer.length, 100); // Sample first 100 bytes
for (let i = 0; i < sampleSize; i++) {
const byte = buffer[i];
// Check for printable ASCII characters and common whitespace
if ((byte >= 32 && byte <= 126) || byte === 9 || byte === 10 || byte === 13) {
readableChars++;
}
}
// Consider readable if more than 70% are printable characters
return (readableChars / sampleSize) > 0.7;
}
/**
* Extract readable content from buffer
*/
function extractContent(buffer, maxSize) {
if (!buffer || buffer.length === 0) {
return null;
}
// Skip if too large
if (buffer.length > maxSize) {
stats.contentSkipped++;
return null;
}
if (!config.content.indexReadableContent) {
return null;
}
// Check if content is readable
if (isReadableContent(buffer)) {
try {
return buffer.toString('utf8', 0, Math.min(buffer.length, maxSize));
} catch (e) {
logger.debug('Failed to convert buffer to string:', e.message);
return null;
}
}
return null;
}
/**
* Add document to cache
*/
function addToCache(document) {
if (documentCache.length >= maxCacheSize) {
// Remove oldest document if cache is full
documentCache.shift();
stats.cacheOverflows++;
logger.warn(`Cache overflow: removed oldest document (cache size: ${maxCacheSize})`);
}
documentCache.push(document);
stats.cachedDocuments = documentCache.length;
logger.debug(`Document added to cache (total: ${documentCache.length})`);
}
/**
* Try to flush cached documents to Elasticsearch
*/
async function flushCache() {
if (documentCache.length === 0) return;
logger.info(`Attempting to flush ${documentCache.length} cached documents...`);
const documentsToFlush = [...documentCache];
let flushedCount = 0;
for (const document of documentsToFlush) {
try {
await esClient.index({
index: config.elasticsearch.index,
document: document
});
// Remove from cache on success
const index = documentCache.indexOf(document);
if (index > -1) {
documentCache.splice(index, 1);
}
flushedCount++;
stats.packetsIndexed++;
} catch (error) {
logger.debug(`Failed to flush cached document: ${error.message}`);
// Stop trying if ES is still unavailable
break;
}
}
stats.cachedDocuments = documentCache.length;
if (flushedCount > 0) {
logger.info(`Successfully flushed ${flushedCount} documents. Remaining in cache: ${documentCache.length}`);
}
return flushedCount > 0;
}
/**
* Check Elasticsearch availability
*/
async function checkESAvailability() {
try {
await esClient.ping();
if (!esAvailable) {
logger.info('Elasticsearch connection restored!');
esAvailable = true;
// Try to flush cache
await flushCache();
}
return true;
} catch (error) {
if (esAvailable) {
logger.error('Elasticsearch connection lost!');
esAvailable = false;
}
return false;
}
}
/**
* Index document to Elasticsearch with cache fallback
*/
async function indexDocument(document) {
// First, try to flush cache if we have pending documents
if (documentCache.length > 0 && esAvailable) {
const now = Date.now();
if (now - lastESCheckTime > ES_CHECK_INTERVAL) {
await flushCache();
lastESCheckTime = now;
}
}
try {
await esClient.index({
index: config.elasticsearch.index,
document: document
});
stats.packetsIndexed++;
esAvailable = true;
logger.debug('Document indexed successfully');
} catch (error) {
logger.warn(`Failed to index document: ${error.message}. Adding to cache.`);
esAvailable = false;
addToCache(document);
stats.errors++;
}
}
/**
* Parse and index a packet
*/
async function processPacket(buffer, interfaceInfo) {
stats.packetsProcessed++;
try {
// Decode Ethernet layer
const ret = decoders.Ethernet(buffer);
if (!ret || !ret.info) {
stats.packetsSkipped++;
return;
}
const packet = {
'@timestamp': new Date().toISOString(),
date: new Date().toISOString(),
interface: {
name: interfaceInfo.name,
ip: interfaceInfo.ip,
mac: interfaceInfo.mac
},
ethernet: {
src: ret.info.srcmac,
dst: ret.info.dstmac,
type: ret.info.type
}
};
// Decode IP layer
if (ret.info.type === PROTOCOL.ETHERNET.IPV4) {
const ipRet = decoders.IPV4(buffer, ret.offset);
if (ipRet) {
const srcAddr = validateIPAddress(ipRet.info.srcaddr);
const dstAddr = validateIPAddress(ipRet.info.dstaddr);
// Only add IP fields if addresses are valid
if (srcAddr && dstAddr) {
packet.ip = {
version: 4,
src: srcAddr,
dst: dstAddr,
protocol: ipRet.info.protocol,
ttl: ipRet.info.ttl,
length: ipRet.info.totallen
};
} else {
// Store raw addresses if validation fails
packet.ip_raw = {
version: 4,
src: ipRet.info.srcaddr,
dst: ipRet.info.dstaddr,
protocol: ipRet.info.protocol,
ttl: ipRet.info.ttl,
length: ipRet.info.totallen
};
logger.debug(`Invalid IPv4 addresses: ${ipRet.info.srcaddr} -> ${ipRet.info.dstaddr}`);
}
// Decode TCP
if (ipRet.info.protocol === PROTOCOL.IP.TCP) {
const tcpRet = decoders.TCP(buffer, ipRet.offset);
if (tcpRet) {
packet.tcp = {
src_port: tcpRet.info.srcport,
dst_port: tcpRet.info.dstport,
flags: {
syn: !!(tcpRet.info.flags & 0x02),
ack: !!(tcpRet.info.flags & 0x10),
fin: !!(tcpRet.info.flags & 0x01),
rst: !!(tcpRet.info.flags & 0x04),
psh: !!(tcpRet.info.flags & 0x08)
},
seq: tcpRet.info.seqno,
ack_seq: tcpRet.info.ackno,
window: tcpRet.info.window
};
// Extract payload
if (tcpRet.offset < buffer.length) {
const payload = buffer.slice(tcpRet.offset);
const content = extractContent(payload, config.content.maxContentSize);
if (content) {
packet.content = content;
packet.content_length = payload.length;
} else if (payload.length > 0) {
packet.content_length = payload.length;
packet.content_type = 'binary';
}
}
}
}
// Decode UDP
else if (ipRet.info.protocol === PROTOCOL.IP.UDP) {
const udpRet = decoders.UDP(buffer, ipRet.offset);
if (udpRet) {
packet.udp = {
src_port: udpRet.info.srcport,
dst_port: udpRet.info.dstport,
length: udpRet.info.length
};
// Extract payload
if (udpRet.offset < buffer.length) {
const payload = buffer.slice(udpRet.offset);
const content = extractContent(payload, config.content.maxContentSize);
if (content) {
packet.content = content;
packet.content_length = payload.length;
} else if (payload.length > 0) {
packet.content_length = payload.length;
packet.content_type = 'binary';
}
}
}
}
// Handle ICMP
else if (ipRet.info.protocol === PROTOCOL.IP.ICMP) {
packet.icmp = {
protocol: 'icmp'
};
}
}
}
// Handle IPv6
else if (ret.info.type === PROTOCOL.ETHERNET.IPV6) {
const ipv6Ret = decoders.IPV6(buffer, ret.offset);
if (ipv6Ret) {
const srcAddr = validateIPAddress(ipv6Ret.info.srcaddr);
const dstAddr = validateIPAddress(ipv6Ret.info.dstaddr);
// Only add IP fields if addresses are valid
if (srcAddr && dstAddr) {
packet.ip = {
version: 6,
src: srcAddr,
dst: dstAddr,
protocol: ipv6Ret.info.protocol,
hop_limit: ipv6Ret.info.hoplimit
};
} else {
// Store raw addresses if validation fails
packet.ip_raw = {
version: 6,
src: ipv6Ret.info.srcaddr,
dst: ipv6Ret.info.dstaddr,
protocol: ipv6Ret.info.protocol,
hop_limit: ipv6Ret.info.hoplimit
};
logger.debug(`Invalid IPv6 addresses: ${ipv6Ret.info.srcaddr} -> ${ipv6Ret.info.dstaddr}`);
}
}
}
// Add GeoIP data for remote IPs
if (packet.ip && packet.ip.src && packet.ip.dst) {
// Get GeoIP for source IP
const srcGeoIP = getGeoIPData(packet.ip.src);
if (srcGeoIP) {
packet.geoip_src = srcGeoIP;
}
// Get GeoIP for destination IP
const dstGeoIP = getGeoIPData(packet.ip.dst);
if (dstGeoIP) {
packet.geoip_dst = dstGeoIP;
}
}
// Index to Elasticsearch (with cache fallback)
await indexDocument(packet);
} catch (error) {
stats.errors++;
logger.error('Error processing packet:', error.message);
}
}
/**
* Setup packet capture for an interface
*/
function setupCapture(interfaceName) {
const interfaceInfo = getInterfaceInfo(interfaceName);
if (!interfaceInfo || !interfaceInfo.ip) {
logger.warn(`Interface ${interfaceName} not found or has no IPv4 address`);
return null;
}
const cap = new Cap();
const device = interfaceName;
const filter = buildBPFFilter();
const bufferSize = config.capture.bufferSize;
const buffer = Buffer.alloc(65535);
try {
const linkType = cap.open(device, filter, bufferSize, buffer);
logger.info(`Capturing on interface: ${interfaceName} (${interfaceInfo.ip})`);
logger.info(`Promiscuous mode: ${config.capture.promiscuousMode ? 'enabled' : 'disabled'}`);
if (filter) {
logger.info(`BPF filter: ${filter}`);
}
logger.info(`Link type: ${linkType}`);
cap.on('packet', (nbytes, trunc) => {
if (linkType === 'ETHERNET') {
const packetData = buffer.slice(0, nbytes);
processPacket(packetData, interfaceInfo).catch(err => {
logger.error('Failed to process packet:', err.message);
});
}
});
return cap;
} catch (error) {
logger.error(`Failed to setup capture on ${interfaceName}:`, error.message);
return null;
}
}
/**
* Initialize Elasticsearch index with mapping
*/
async function initializeElasticsearch() {
try {
// Check if index exists
const indexExists = await esClient.indices.exists({
index: config.elasticsearch.index
});
if (!indexExists) {
logger.info(`Creating Elasticsearch index: ${config.elasticsearch.index}`);
await esClient.indices.create({
index: config.elasticsearch.index,
body: {
mappings: {
properties: {
'@timestamp': { type: 'date' },
date: { type: 'date' },
interface: {
properties: {
name: { type: 'keyword' },
ip: { type: 'ip' },
mac: { type: 'keyword' }
}
},
ethernet: {
properties: {
src: { type: 'keyword' },
dst: { type: 'keyword' },
type: { type: 'integer' }
}
},
ip: {
properties: {
version: { type: 'integer' },
src: { type: 'ip' },
dst: { type: 'ip' },
protocol: { type: 'integer' },
ttl: { type: 'integer' },
length: { type: 'integer' },
hop_limit: { type: 'integer' }
}
},
ip_raw: {
properties: {
version: { type: 'integer' },
src: { type: 'keyword' },
dst: { type: 'keyword' },
protocol: { type: 'integer' },
ttl: { type: 'integer' },
length: { type: 'integer' },
hop_limit: { type: 'integer' }
}
},
tcp: {
properties: {
src_port: { type: 'integer' },
dst_port: { type: 'integer' },
flags: {
properties: {
syn: { type: 'boolean' },
ack: { type: 'boolean' },
fin: { type: 'boolean' },
rst: { type: 'boolean' },
psh: { type: 'boolean' }
}
},
seq: { type: 'long' },
ack_seq: { type: 'long' },
window: { type: 'integer' }
}
},
udp: {
properties: {
src_port: { type: 'integer' },
dst_port: { type: 'integer' },
length: { type: 'integer' }
}
},
icmp: {
properties: {
protocol: { type: 'keyword' }
}
},
geoip_src: {
properties: {
country: { type: 'keyword' },
country_name: { type: 'keyword' },
region: { type: 'keyword' },
city: { type: 'keyword' },
timezone: { type: 'keyword' },
location: { type: 'geo_point' },
coordinates: { type: 'geo_point' }
}
},
geoip_dst: {
properties: {
country: { type: 'keyword' },
country_name: { type: 'keyword' },
region: { type: 'keyword' },
city: { type: 'keyword' },
timezone: { type: 'keyword' },
location: { type: 'geo_point' },
coordinates: { type: 'geo_point' }
}
},
content: { type: 'text' },
content_length: { type: 'integer' },
content_type: { type: 'keyword' }
}
}
}
});
logger.info('Elasticsearch index created successfully');
} else {
logger.info(`Using existing Elasticsearch index: ${config.elasticsearch.index}`);
}
} catch (error) {
logger.error('Failed to initialize Elasticsearch:', error.message);
throw error;
}
}
/**
* Print statistics
*/
function printStats() {
const uptime = Math.floor((Date.now() - stats.startTime) / 1000);
const rate = uptime > 0 ? (stats.packetsProcessed / uptime).toFixed(2) : 0;
logger.info('=== Packet Capture Statistics ===');
logger.info(`Uptime: ${uptime}s`);
logger.info(`Packets processed: ${stats.packetsProcessed}`);
logger.info(`Packets indexed: ${stats.packetsIndexed}`);
logger.info(`Packets skipped: ${stats.packetsSkipped}`);
logger.info(`Content skipped (too large): ${stats.contentSkipped}`);
logger.info(`Cached documents: ${stats.cachedDocuments}`);
logger.info(`Cache overflows: ${stats.cacheOverflows}`);
logger.info(`Elasticsearch status: ${esAvailable ? 'connected' : 'disconnected'}`);
logger.info(`Errors: ${stats.errors}`);
logger.info(`Processing rate: ${rate} packets/sec`);
logger.info('================================');
}
/**
* Main function
*/
async function main() {
logger.info('Network Packet Capture Starting...');
// Check for root/admin privileges
if (process.getuid && process.getuid() !== 0) {
logger.warn('Warning: Not running as root. Packet capture may fail.');
logger.warn('Consider running with: sudo node index.js');
}
// Initialize Elasticsearch
try {
await initializeElasticsearch();
} catch (error) {
logger.error('Failed to initialize Elasticsearch. Exiting.');
process.exit(1);
}
// Get public IP address and GeoIP data
try {
publicIP = await getPublicIP();
logger.info(`Detected public IP: ${publicIP}`);
// Get GeoIP data for our public IP
if (publicIP) {
publicIPGeoData = getGeoIPData(publicIP);
if (publicIPGeoData) {
logger.info(`Public IP geolocation: ${publicIPGeoData.city || 'Unknown'}, ${publicIPGeoData.country || 'Unknown'}`);
} else {
logger.warn('Could not determine geolocation for public IP');
}
}
} catch (error) {
logger.warn('Failed to detect public IP address:', error.message);
logger.warn('GeoIP will be applied to all non-private IPs');
}
// Determine interfaces to capture
let interfaces = config.capture.interfaces;
if (interfaces.length === 0) {
interfaces = getAvailableInterfaces();
logger.info(`No interfaces specified. Using all available: ${interfaces.join(', ')}`);
}
if (interfaces.length === 0) {
logger.error('No network interfaces available for capture');
process.exit(1);
}
// Setup capture on each interface
const captures = [];
for (const iface of interfaces) {
const cap = setupCapture(iface);
if (cap) {
captures.push(cap);
}
}
if (captures.length === 0) {
logger.error('Failed to setup capture on any interface');
process.exit(1);
}
// Setup statistics reporting
setInterval(printStats, config.logging.statsInterval * 1000);
// Setup periodic ES availability check and cache flush
setInterval(async () => {
await checkESAvailability();
}, ES_CHECK_INTERVAL);
// Graceful shutdown
process.on('SIGINT', async () => {
logger.info('\nShutting down...');
// Try to flush remaining cached documents
if (documentCache.length > 0) {
logger.info(`Attempting to flush ${documentCache.length} cached documents before exit...`);
try {
await flushCache();
if (documentCache.length > 0) {
logger.warn(`Warning: ${documentCache.length} documents remain in cache and will be lost`);
}
} catch (error) {
logger.error('Failed to flush cache on shutdown:', error.message);
}
}
printStats();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('\nShutting down...');
// Try to flush remaining cached documents
if (documentCache.length > 0) {
logger.info(`Attempting to flush ${documentCache.length} cached documents before exit...`);
try {
await flushCache();
if (documentCache.length > 0) {
logger.warn(`Warning: ${documentCache.length} documents remain in cache and will be lost`);
}
} catch (error) {
logger.error('Failed to flush cache on shutdown:', error.message);
}
}
printStats();
process.exit(0);
});
logger.info('Packet capture running. Press Ctrl+C to stop.');
}
// Run the application
main().catch(error => {
logger.error('Fatal error:', error);
process.exit(1);
});