deleted logs and WebRTC issue

Signed-off-by: ale <ale@manalejandro.com>
Este commit está contenido en:
ale
2025-06-15 14:04:28 +02:00
padre 7c3921cdd3
commit e238b4b307
Se han modificado 7 ficheros con 335 adiciones y 114 borrados

Ver fichero

@@ -136,6 +136,7 @@ Once a node is running, you can use these commands:
- `send <message>` - Send a message through the ring - `send <message>` - Send a message through the ring
- `info` - Show network information - `info` - Show network information
- `peers` - List connected peers - `peers` - List connected peers
- `connections` - Show persistent connection status
- `help` - Show available commands - `help` - Show available commands
- `quit` - Exit the node - `quit` - Exit the node
@@ -319,3 +320,35 @@ MIT License - see LICENSE file for details.
- [Distributed Systems](https://en.wikipedia.org/wiki/Distributed_computing) - [Distributed Systems](https://en.wikipedia.org/wiki/Distributed_computing)
- [WebRTC Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API) - [WebRTC Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API)
- [Consensus Algorithms](https://en.wikipedia.org/wiki/Consensus_(computer_science)) - [Consensus Algorithms](https://en.wikipedia.org/wiki/Consensus_(computer_science))
### Connection Persistence
The Ring Network now includes connection persistence features:
#### Persistent Connection Management
- **Automatic Connection Maintenance**: Nodes automatically maintain persistent WebRTC connections with other nodes
- **Heartbeat System**: Regular heartbeat messages keep connections alive and detect failures
- **Connection Monitoring**: Built-in monitoring of connection health and status
- **Automatic Reconnection**: Failed connections are automatically retried with exponential backoff
#### Connection Status
Use the `connections` command to view:
- Total active connections
- Persistent connection count
- Connection health status
- Last seen timestamps for each peer
- Reconnection attempt counts
#### Bootstrap Connections
When joining via a bootstrap node:
- Initial WebSocket connection for joining the network
- Automatic establishment of persistent WebRTC connection
- Bootstrap node maintains connection with new nodes
- Connection health monitoring and automatic recovery
#### Features
-**Heartbeat Messages**: 30-second intervals to maintain connections
-**Connection Health Checks**: Monitor connection state every 30 seconds
-**Automatic Reconnection**: Up to 3 attempts before giving up
-**Connection Status Monitoring**: Real-time connection status display
-**Graceful Shutdown**: Proper cleanup of connections and intervals

Ver fichero

@@ -61,15 +61,15 @@ class NetworkExample {
const nodeColor = node.isOracle ? chalk.yellow : chalk.blue; const nodeColor = node.isOracle ? chalk.yellow : chalk.blue;
node.on('peerConnected', (peerId) => { node.on('peerConnected', (peerId) => {
console.log(nodeColor(`${nodeType} ${index}: Connected to ${peerId.substring(0, 8)}`)); // Peer connected
}); });
node.on('peerDisconnected', (peerId) => { node.on('peerDisconnected', (peerId) => {
console.log(nodeColor(`${nodeType} ${index}: Disconnected from ${peerId.substring(0, 8)}`)); // Peer disconnected
}); });
node.on('ringMessage', ({ from, payload }) => { node.on('ringMessage', ({ from, payload }) => {
console.log(nodeColor(`${nodeType} ${index}: Received message from ${from.substring(0, 8)}: ${JSON.stringify(payload)}`)); // Message received
}); });
}); });
} }

46
node.js
Ver fichero

@@ -76,8 +76,6 @@ Config File Example:
} }
} }
console.log(chalk.green('🚀 Starting Ring Network Node...'));
const node = new RingNode(options); const node = new RingNode(options);
// Handle graceful shutdown // Handle graceful shutdown
@@ -95,37 +93,30 @@ process.on('SIGTERM', async () => {
// Connect to bootstrap node if specified // Connect to bootstrap node if specified
if (options.bootstrap) { if (options.bootstrap) {
console.log(chalk.cyan(`🔗 Connecting to bootstrap node: ${options.bootstrap}`));
setTimeout(async () => { setTimeout(async () => {
try { try {
const success = await node.joinRing(options.bootstrap); await node.joinRing(options.bootstrap);
if (success) {
console.log(chalk.green('✅ Successfully joined the ring network!'));
} else {
console.log(chalk.red('❌ Failed to join the ring network'));
}
} catch (error) { } catch (error) {
console.error(chalk.red('Error joining network:'), error.message); // Connection failed
} }
}, 2000); }, 2000);
} }
// Set up event handlers for demonstration // Set up event handlers
node.on('peerConnected', (peerId) => { node.on('peerConnected', (peerId) => {
console.log(chalk.blue(`👋 New peer connected: ${peerId.substring(0, 8)}`)); // Peer connected
}); });
node.on('peerDisconnected', (peerId) => { node.on('peerDisconnected', (peerId) => {
console.log(chalk.red(`👋 Peer disconnected: ${peerId.substring(0, 8)}`)); // Peer disconnected
}); });
node.on('ringMessage', ({ from, payload }) => { node.on('ringMessage', ({ from, payload }) => {
console.log(chalk.magenta(`📨 Ring message from ${from.substring(0, 8)}: ${JSON.stringify(payload)}`)); // Ring message received
}); });
node.on('networkUpdate', ({ from, payload }) => { node.on('networkUpdate', ({ from, payload }) => {
console.log(chalk.cyan(`🔄 Network update from ${from.substring(0, 8)}`)); // Network update received
}); });
// Interactive commands // Interactive commands
@@ -146,7 +137,6 @@ function handleCommand(command) {
if (args.length > 0) { if (args.length > 0) {
const message = args.join(' '); const message = args.join(' ');
node.sendRingMessage({ type: 'chat', content: message }); node.sendRingMessage({ type: 'chat', content: message });
console.log(chalk.green(`📤 Sent: ${message}`));
} }
break; break;
@@ -164,12 +154,34 @@ function handleCommand(command) {
}); });
break; break;
case 'connections':
const status = node.getConnectionStatus();
const persistentConns = node.getPersistentConnections();
console.log(chalk.blue('\n🔗 Connection Status:'));
console.log(` Total Connections: ${status.totalConnections}`);
console.log(` Persistent Connections: ${status.persistentConnections}`);
console.log(` Active Connections: ${status.activeConnections}`);
console.log(` Known Nodes: ${status.knownNodes}`);
console.log(` Oracle Nodes: ${status.oracleNodes}`);
if (persistentConns.length > 0) {
console.log(chalk.blue('\n🔄 Persistent Connections:'));
persistentConns.forEach(conn => {
const timeSince = conn.lastSeen ? `${Math.floor((Date.now() - conn.lastSeen) / 1000)}s ago` : 'never';
const statusColor = conn.connectionState === 'connected' ? chalk.green : chalk.red;
console.log(` - ${conn.nodeId.substring(0, 8)}... [${statusColor(conn.connectionState)}] ${conn.isOracle ? '🔮' : '💾'} (${timeSince})`);
});
}
break;
case 'help': case 'help':
console.log(chalk.blue(` console.log(chalk.blue(`
📚 Available Commands: 📚 Available Commands:
send <message> - Send a message through the ring send <message> - Send a message through the ring
info - Show network information info - Show network information
peers - List connected peers peers - List connected peers
connections - Show persistent connection status
help - Show this help help - Show this help
quit - Exit the node quit - Exit the node
`)); `));

Ver fichero

@@ -84,8 +84,6 @@ Oracle Services:
} }
} }
console.log(chalk.yellow('🚀 Starting Ring Network Oracle Node...'));
const oracle = new OracleNode(options); const oracle = new OracleNode(options);
// Handle graceful shutdown // Handle graceful shutdown
@@ -103,37 +101,30 @@ process.on('SIGTERM', async () => {
// Connect to bootstrap node if specified // Connect to bootstrap node if specified
if (options.bootstrap) { if (options.bootstrap) {
console.log(chalk.cyan(`🔗 Connecting to bootstrap node: ${options.bootstrap}`));
setTimeout(async () => { setTimeout(async () => {
try { try {
const success = await oracle.joinRing(options.bootstrap); await oracle.joinRing(options.bootstrap);
if (success) {
console.log(chalk.green('✅ Successfully joined the ring network as Oracle!'));
} else {
console.log(chalk.red('❌ Failed to join the ring network'));
}
} catch (error) { } catch (error) {
console.error(chalk.red('Error joining network:'), error.message); // Connection failed
} }
}, 2000); }, 2000);
} }
// Set up event handlers // Set up event handlers
oracle.on('peerConnected', (peerId) => { oracle.on('peerConnected', (peerId) => {
console.log(chalk.blue(`👋 New peer connected: ${peerId.substring(0, 8)}`)); // Peer connected
}); });
oracle.on('peerDisconnected', (peerId) => { oracle.on('peerDisconnected', (peerId) => {
console.log(chalk.red(`👋 Peer disconnected: ${peerId.substring(0, 8)}`)); // Peer disconnected
}); });
oracle.on('ringMessage', ({ from, payload }) => { oracle.on('ringMessage', ({ from, payload }) => {
console.log(chalk.magenta(`📨 Ring message from ${from.substring(0, 8)}: ${JSON.stringify(payload)}`)); // Ring message received
}); });
oracle.on('oracleResponse', ({ from, payload }) => { oracle.on('oracleResponse', ({ from, payload }) => {
console.log(chalk.cyan(`🔮 Oracle response from ${from.substring(0, 8)}: ${JSON.stringify(payload)}`)); // Oracle response received
}); });
// Interactive commands // Interactive commands

Ver fichero

@@ -20,7 +20,6 @@ export class OracleNode extends RingNode {
}; };
this.initializeOracleServices(); this.initializeOracleServices();
console.log(chalk.yellow(`🔮✨ Oracle Node ${this.id.substring(0, 8)} initialized with enhanced capabilities`));
} }
initializeOracleServices() { initializeOracleServices() {
@@ -57,8 +56,6 @@ export class OracleNode extends RingNode {
const { query, data, service } = payload; const { query, data, service } = payload;
this.networkMetrics.queriesAnswered++; this.networkMetrics.queriesAnswered++;
console.log(chalk.cyan(`🔮 Processing oracle query: ${service || query}`));
if (service && this.oracleServices.has(service)) { if (service && this.oracleServices.has(service)) {
return this.oracleServices.get(service)(data); return this.oracleServices.get(service)(data);
} }
@@ -429,7 +426,6 @@ export class OracleNode extends RingNode {
}); });
if (!health.healthy) { if (!health.healthy) {
console.log(chalk.red(`⚠️ Network health issue detected`));
this.handleNetworkHealthIssue(health); this.handleNetworkHealthIssue(health);
} }
} }
@@ -437,7 +433,6 @@ export class OracleNode extends RingNode {
handleNetworkHealthIssue(health) { handleNetworkHealthIssue(health) {
// Attempt to reconnect to peers // Attempt to reconnect to peers
if (!health.checks.webrtc) { if (!health.checks.webrtc) {
console.log(chalk.yellow('🔄 Attempting to reconnect to network...'));
// Implement reconnection logic // Implement reconnection logic
} }
@@ -485,7 +480,7 @@ export class OracleNode extends RingNode {
} }
} }
console.log(chalk.blue(`🧹 Cleaned up old data. DataStore: ${this.dataStore.size}, Consensus: ${this.consensusData.size}`)); // Cleaned up old data
} }
// Handle additional message types specific to Oracle // Handle additional message types specific to Oracle
@@ -518,10 +513,8 @@ export class OracleNode extends RingNode {
if (operation === 'set') { if (operation === 'set') {
this.dataStore.set(key, entry); this.dataStore.set(key, entry);
console.log(chalk.blue(`📥 Replicated data: ${key}`));
} else if (operation === 'delete') { } else if (operation === 'delete') {
this.dataStore.delete(key); this.dataStore.delete(key);
console.log(chalk.blue(`🗑️ Deleted replicated data: ${key}`));
} }
} }
@@ -535,8 +528,6 @@ export class OracleNode extends RingNode {
status: 'active', status: 'active',
proposer: from proposer: from
}); });
console.log(chalk.cyan(`📋 New consensus proposal ${proposalId} from ${from.substring(0, 8)}`));
} }
handleConsensusVote(from, payload) { handleConsensusVote(from, payload) {
@@ -546,13 +537,10 @@ export class OracleNode extends RingNode {
if (consensusItem) { if (consensusItem) {
consensusItem.votes.set(voterId, vote); consensusItem.votes.set(voterId, vote);
const result = this.checkConsensus(proposalId); const result = this.checkConsensus(proposalId);
console.log(chalk.cyan(`🗳️ Vote received for ${proposalId}: ${vote} (${result.status})`));
} }
} }
handleHealthAlert(from, payload) { handleHealthAlert(from, payload) {
console.log(chalk.red(`🚨 Health alert from ${from.substring(0, 8)}`));
this.networkMetrics.networkEvents.push({ this.networkMetrics.networkEvents.push({
type: 'health-alert', type: 'health-alert',
timestamp: Date.now(), timestamp: Date.now(),
@@ -565,11 +553,7 @@ export class OracleNode extends RingNode {
const { destination, message } = payload; const { destination, message } = payload;
// Attempt to route the message to the destination // Attempt to route the message to the destination
if (this.webrtc.sendMessage(destination, message)) { this.webrtc.sendMessage(destination, message);
console.log(chalk.green(`📬 Routed message to ${destination.substring(0, 8)}`));
} else {
console.log(chalk.yellow(`📪 Unable to route message to ${destination.substring(0, 8)}`));
}
} }
getOracleInfo() { getOracleInfo() {

Ver fichero

@@ -31,14 +31,14 @@ export class RingNode extends EventEmitter {
this.knownNodes = new Map(); this.knownNodes = new Map();
this.messageHistory = new Set(); this.messageHistory = new Set();
this.oracleNodes = new Set(); this.oracleNodes = new Set();
this.activeSignalingConnections = new Map(); // Store active WebSocket connections for signaling
this.setupWebRTCHandlers(); this.setupWebRTCHandlers();
this.setupDiscoveryServer(); this.setupDiscoveryServer();
console.log(chalk.green(`🔗 Ring Node ${this.id.substring(0, 8)} initialized on port ${this.port}`)); // Start connection management
if (this.isOracle) { this.startConnectionManager();
console.log(chalk.yellow(`🔮 Oracle mode enabled`)); this.startHeartbeat();
}
} }
getRandomPort() { getRandomPort() {
@@ -47,12 +47,10 @@ export class RingNode extends EventEmitter {
setupWebRTCHandlers() { setupWebRTCHandlers() {
this.webrtc.on('peerConnected', (peerId) => { this.webrtc.on('peerConnected', (peerId) => {
console.log(chalk.blue(`✅ Connected to peer: ${peerId.substring(0, 8)}`));
this.emit('peerConnected', peerId); this.emit('peerConnected', peerId);
}); });
this.webrtc.on('peerDisconnected', (peerId) => { this.webrtc.on('peerDisconnected', (peerId) => {
console.log(chalk.red(`❌ Disconnected from peer: ${peerId.substring(0, 8)}`));
this.handlePeerDisconnection(peerId); this.handlePeerDisconnection(peerId);
this.emit('peerDisconnected', peerId); this.emit('peerDisconnected', peerId);
}); });
@@ -62,10 +60,22 @@ export class RingNode extends EventEmitter {
}); });
this.webrtc.on('signal', ({ peerId, signal }) => { this.webrtc.on('signal', ({ peerId, signal }) => {
this.sendSignalingMessage(peerId, { // Try to send signal through stored WebSocket connection
type: 'signal', const signalingWs = this.activeSignalingConnections.get(peerId);
signal if (signalingWs && signalingWs.readyState === signalingWs.OPEN) {
}); signalingWs.send(JSON.stringify({
type: 'signal',
from: this.id,
to: peerId,
payload: signal
}));
} else {
// Fallback to old method
this.sendSignalingMessage(peerId, {
type: 'signal',
signal
});
}
}); });
} }
@@ -78,12 +88,21 @@ export class RingNode extends EventEmitter {
const message = JSON.parse(data.toString()); const message = JSON.parse(data.toString());
await this.handleSignalingMessage(ws, message); await this.handleSignalingMessage(ws, message);
} catch (error) { } catch (error) {
console.error('Error handling signaling message:', error); // Silently handle signaling errors
}
});
// Clean up signaling connections when WebSocket closes
ws.on('close', () => {
// Find and remove this WebSocket from active signaling connections
for (const [peerId, storedWs] of this.activeSignalingConnections.entries()) {
if (storedWs === ws) {
this.activeSignalingConnections.delete(peerId);
break;
}
} }
}); });
}); });
console.log(chalk.cyan(`🌐 Discovery server listening on port ${this.port}`));
} }
async handleSignalingMessage(ws, message) { async handleSignalingMessage(ws, message) {
@@ -105,6 +124,34 @@ export class RingNode extends EventEmitter {
await this.webrtc.handleSignal(from, payload); await this.webrtc.handleSignal(from, payload);
break; break;
case 'webrtc-connect':
// Handle WebRTC connection request
console.log(chalk.cyan(`<EFBFBD> Received WebRTC connection request from ${from.substring(0, 8)}`));
// Check if we already have a connection with this peer
const existingConnection = this.webrtc.connections.get(`${this.id}-${from}`);
if (existingConnection && existingConnection.peer && existingConnection.peer.connected) {
ws.send(JSON.stringify({
type: 'webrtc-connect-response',
success: false,
reason: 'Already connected'
}));
return;
}
// Store the WebSocket connection for signaling
this.activeSignalingConnections.set(from, ws);
// Accept the connection request
ws.send(JSON.stringify({
type: 'webrtc-connect-response',
success: true
}));
// Create connection as receiver (non-initiator)
const newConnection = await this.webrtc.createConnection(from, false);
break;
case 'discovery': case 'discovery':
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'discovery-response', type: 'discovery-response',
@@ -124,21 +171,27 @@ export class RingNode extends EventEmitter {
async connectToPeer(nodeId, address) { async connectToPeer(nodeId, address) {
try { try {
const ws = new WebSocket(`ws://${address}`); const ws = new WebSocket(`ws://${address}`);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let connection = null;
let timeoutId;
let signalHandler, connectHandler;
ws.on('open', async () => { ws.on('open', async () => {
// Send discovery message // Send connection request
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'discovery', type: 'webrtc-connect',
from: this.id from: this.id,
to: nodeId
})); }));
// Create WebRTC connection as initiator // Create WebRTC connection as initiator
const connection = await this.webrtc.createConnection(nodeId, true); connection = await this.webrtc.createConnection(nodeId, true);
// Listen for signaling data from our peer // Listen for signaling data from our peer
const signalHandler = ({ peerId, signal }) => { signalHandler = ({ peerId, signal }) => {
if (peerId === nodeId) { if (peerId === nodeId) {
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'signal', type: 'signal',
@@ -152,11 +205,9 @@ export class RingNode extends EventEmitter {
this.webrtc.on('signal', signalHandler); this.webrtc.on('signal', signalHandler);
// Listen for connection success // Listen for connection success
const connectHandler = (peerId) => { connectHandler = (peerId) => {
if (peerId === nodeId) { if (peerId === nodeId) {
this.webrtc.removeListener('signal', signalHandler); cleanup();
this.webrtc.removeListener('peerConnected', connectHandler);
ws.close();
resolve(true); resolve(true);
} }
}; };
@@ -165,19 +216,45 @@ export class RingNode extends EventEmitter {
}); });
ws.on('message', async (data) => { ws.on('message', async (data) => {
const message = JSON.parse(data.toString()); try {
const message = JSON.parse(data.toString());
if (message.type === 'signal' && message.from === nodeId) { if (message.type === 'signal' && message.from === nodeId) {
await this.webrtc.handleSignal(nodeId, message.payload); await this.webrtc.handleSignal(nodeId, message.payload);
} else if (message.type === 'webrtc-connect-response') {
if (!message.success) {
cleanup();
resolve(false);
}
}
} catch (error) {
// Silently handle parsing errors
} }
}); });
ws.on('error', reject); ws.on('error', (error) => {
cleanup();
reject(error);
});
setTimeout(() => reject(new Error('Connection timeout')), 15000); ws.on('close', () => {
// WebSocket connection closed
});
const cleanup = () => {
if (timeoutId) clearTimeout(timeoutId);
if (signalHandler) this.webrtc.removeListener('signal', signalHandler);
if (connectHandler) this.webrtc.removeListener('peerConnected', connectHandler);
if (ws.readyState === ws.OPEN) ws.close();
};
// Set timeout for connection process
timeoutId = setTimeout(() => {
cleanup();
reject(new Error('Connection timeout'));
}, 20000); // Increased timeout to 20 seconds
}); });
} catch (error) { } catch (error) {
console.error(`Failed to connect to peer ${nodeId}:`, error);
return false; return false;
} }
} }
@@ -202,9 +279,17 @@ export class RingNode extends EventEmitter {
oldMessages.forEach(id => this.messageHistory.delete(id)); oldMessages.forEach(id => this.messageHistory.delete(id));
} }
console.log(chalk.magenta(`📨 Received ${type} from ${from.substring(0, 8)}`)); // Update last seen timestamp for the sender
const senderInfo = this.knownNodes.get(from);
if (senderInfo) {
senderInfo.lastSeen = Date.now();
this.knownNodes.set(from, senderInfo);
}
switch (type) { switch (type) {
case 'heartbeat':
// Heartbeat message - just update lastSeen (already done above)
break;
case 'ring-message': case 'ring-message':
this.handleRingMessage(from, payload, route, messageId); this.handleRingMessage(from, payload, route, messageId);
break; break;
@@ -276,21 +361,20 @@ export class RingNode extends EventEmitter {
} }
} }
console.log(chalk.green(`📤 Sent ring message to ${sent} peers`));
return sent > 0; return sent > 0;
} }
async joinRing(bootstrapNode) { async joinRing(bootstrapNode) {
try { try {
console.log(chalk.yellow(`🔄 Attempting to join ring via ${bootstrapNode}`)); const [host, port] = bootstrapNode.split(':');
const bootstrapAddress = `${host}:${port}`;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const ws = new WebSocket(`ws://${bootstrapNode}`); const ws = new WebSocket(`ws://${bootstrapAddress}`);
let timeoutId; let timeoutId;
let bootstrapNodeId = null;
ws.on('open', () => { ws.on('open', () => {
console.log(chalk.cyan('🔗 Connected to bootstrap node, sending join request...'));
// Send join ring request directly through WebSocket // Send join ring request directly through WebSocket
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'join-ring', type: 'join-ring',
@@ -303,29 +387,34 @@ export class RingNode extends EventEmitter {
})); }));
}); });
ws.on('message', (data) => { ws.on('message', async (data) => {
try { try {
const message = JSON.parse(data.toString()); const message = JSON.parse(data.toString());
if (message.type === 'join-response') { if (message.type === 'join-response') {
if (message.success) { if (message.success) {
console.log(chalk.green('✅ Successfully joined the ring network!')); bootstrapNodeId = message.bootstrapNodeId;
clearTimeout(timeoutId); clearTimeout(timeoutId);
ws.close(); ws.close();
resolve(true); resolve(true);
} else { } else {
console.log(chalk.red('❌ Join request was rejected'));
clearTimeout(timeoutId); clearTimeout(timeoutId);
ws.close(); ws.close();
resolve(false); resolve(false);
} }
} }
// Handle signaling for WebRTC connection establishment
if (message.type === 'signal' && message.from === bootstrapNodeId) {
await this.webrtc.handleSignal(bootstrapNodeId, message.payload);
}
} catch (error) { } catch (error) {
console.error('Error parsing join response:', error); // Silently handle parsing errors
} }
}); });
ws.on('error', (error) => { ws.on('error', (error) => {
console.error('WebSocket error:', error);
clearTimeout(timeoutId); clearTimeout(timeoutId);
reject(error); reject(error);
}); });
@@ -341,7 +430,6 @@ export class RingNode extends EventEmitter {
}, 15000); // Increased timeout to 15 seconds }, 15000); // Increased timeout to 15 seconds
}); });
} catch (error) { } catch (error) {
console.error('Failed to join ring:', error);
return false; return false;
} }
} }
@@ -349,11 +437,14 @@ export class RingNode extends EventEmitter {
async handleJoinRingRequest(ws, payload) { async handleJoinRingRequest(ws, payload) {
const { nodeId, port, isOracle } = payload; const { nodeId, port, isOracle } = payload;
console.log(chalk.cyan(`🔗 New node ${nodeId.substring(0, 8)} wants to join the ring`));
try { try {
// Add to known nodes // Add to known nodes
this.knownNodes.set(nodeId, { port, isOracle }); this.knownNodes.set(nodeId, {
port,
isOracle,
persistent: false,
lastSeen: Date.now()
});
if (isOracle) { if (isOracle) {
this.oracleNodes.add(nodeId); this.oracleNodes.add(nodeId);
@@ -362,17 +453,36 @@ export class RingNode extends EventEmitter {
// Find optimal position in rings for the new node // Find optimal position in rings for the new node
await this.integrateNewNode(nodeId); await this.integrateNewNode(nodeId);
// Send success response // Send success response with bootstrap node info
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'join-response', type: 'join-response',
success: true, success: true,
message: 'Successfully integrated into ring network' message: 'Successfully integrated into ring network',
bootstrapNodeId: this.id,
isOracle: this.isOracle
})); }));
console.log(chalk.green(`✅ Node ${nodeId.substring(0, 8)} successfully joined the ring`)); // After successful join, try to establish WebRTC connection
} catch (error) { setTimeout(async () => {
console.error(`Error handling join request from ${nodeId}:`, error); try {
const nodeAddress = `localhost:${port}`; // Assuming localhost for now
const connection = await this.connectToPeer(nodeId, nodeAddress);
if (connection) {
// Update known node info to mark as persistent
const nodeInfo = this.knownNodes.get(nodeId);
if (nodeInfo) {
nodeInfo.persistent = true;
nodeInfo.address = nodeAddress;
this.knownNodes.set(nodeId, nodeInfo);
}
// Connection established
}
} catch (connectionError) {
// Connection failed
}
}, 3000); // Give the joining node more time to set up
} catch (error) {
// Send failure response // Send failure response
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'join-response', type: 'join-response',
@@ -395,8 +505,6 @@ export class RingNode extends EventEmitter {
// Notify the network of topology change // Notify the network of topology change
this.broadcastNetworkUpdate(); this.broadcastNetworkUpdate();
console.log(chalk.green(`✅ Integrated node ${nodeId.substring(0, 8)} into rings`));
} }
handlePeerDisconnection(peerId) { handlePeerDisconnection(peerId) {
@@ -454,8 +562,6 @@ export class RingNode extends EventEmitter {
// Oracle-specific methods // Oracle-specific methods
handleOracleQuery(from, payload, messageId) { handleOracleQuery(from, payload, messageId) {
console.log(chalk.yellow(`🔮 Oracle query from ${from.substring(0, 8)}: ${payload.query}`));
// Process oracle query and send response // Process oracle query and send response
const response = this.processOracleQuery(payload); const response = this.processOracleQuery(payload);
@@ -543,7 +649,13 @@ export class RingNode extends EventEmitter {
} }
async destroy() { async destroy() {
console.log(chalk.red(`🛑 Shutting down node ${this.id.substring(0, 8)}`)); // Clean up intervals
if (this.connectionCheckInterval) {
clearInterval(this.connectionCheckInterval);
}
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
// Notify peers of shutdown // Notify peers of shutdown
this.webrtc.broadcast({ this.webrtc.broadcast({
@@ -561,4 +673,96 @@ export class RingNode extends EventEmitter {
this.removeAllListeners(); this.removeAllListeners();
} }
startConnectionManager() {
// Start periodic connection health check and maintenance
this.connectionCheckInterval = setInterval(() => {
this.maintainPersistentConnections();
}, 30000); // Check every 30 seconds
}
async maintainPersistentConnections() {
const now = Date.now();
const staleThreshold = 120000; // 2 minutes
// Check each known node
for (const [nodeId, nodeInfo] of this.knownNodes.entries()) {
if (nodeInfo.persistent) {
const connectionState = this.webrtc.getConnectionState(nodeId);
const lastSeen = nodeInfo.lastSeen || 0;
// If connection is lost or stale, try to reconnect
if (connectionState !== 'connected' || (now - lastSeen) > staleThreshold) {
try {
const connection = await this.connectToPeer(nodeId, nodeInfo.address);
if (connection) {
nodeInfo.lastSeen = now;
this.knownNodes.set(nodeId, nodeInfo);
}
} catch (error) {
// If we can't reconnect after multiple attempts, mark as non-persistent
if (!nodeInfo.reconnectAttempts) nodeInfo.reconnectAttempts = 0;
nodeInfo.reconnectAttempts++;
if (nodeInfo.reconnectAttempts >= 3) {
nodeInfo.persistent = false;
}
}
}
}
}
}
sendHeartbeat() {
// Send heartbeat to all connected peers
const heartbeatMessage = {
id: uuidv4(),
type: 'heartbeat',
from: this.id,
timestamp: Date.now()
};
const connectedPeers = this.webrtc.getConnectedPeers();
if (connectedPeers.length > 0) {
this.webrtc.broadcast(heartbeatMessage);
}
}
startHeartbeat() {
// Send heartbeat every 30 seconds
this.heartbeatInterval = setInterval(() => {
this.sendHeartbeat();
}, 30000);
}
getPersistentConnections() {
const persistentConnections = [];
for (const [nodeId, nodeInfo] of this.knownNodes.entries()) {
if (nodeInfo.persistent) {
const connectionState = this.webrtc.getConnectionState(nodeId);
persistentConnections.push({
nodeId: nodeId,
address: nodeInfo.address,
isOracle: nodeInfo.isOracle,
connectionState: connectionState,
lastSeen: nodeInfo.lastSeen,
reconnectAttempts: nodeInfo.reconnectAttempts || 0
});
}
}
return persistentConnections;
}
getConnectionStatus() {
const connectedPeers = this.webrtc.getConnectedPeers();
const persistentConnections = this.getPersistentConnections();
return {
totalConnections: connectedPeers.length,
persistentConnections: persistentConnections.length,
activeConnections: persistentConnections.filter(conn => conn.connectionState === 'connected').length,
knownNodes: this.knownNodes.size,
oracleNodes: this.oracleNodes.size
};
}
} }

Ver fichero

@@ -19,7 +19,6 @@ export class WebRTCManager extends EventEmitter {
// Validate ICE servers configuration // Validate ICE servers configuration
try { try {
WebRTCManager.validateIceServers(this.iceServers); WebRTCManager.validateIceServers(this.iceServers);
console.log(`🌐 Using ${this.iceServers.length} ICE server(s) for WebRTC connections`);
} catch (error) { } catch (error) {
console.error('Invalid ICE servers configuration:', error.message); console.error('Invalid ICE servers configuration:', error.message);
throw error; throw error;
@@ -61,7 +60,6 @@ export class WebRTCManager extends EventEmitter {
// Handle connection // Handle connection
peer.on('connect', () => { peer.on('connect', () => {
connection.state = 'connected'; connection.state = 'connected';
console.log(`WebRTC connection established with ${peerId.substring(0, 8)}`);
this.emit('peerConnected', peerId); this.emit('peerConnected', peerId);
}); });
@@ -80,14 +78,13 @@ export class WebRTCManager extends EventEmitter {
// Handle errors // Handle errors
peer.on('error', (error) => { peer.on('error', (error) => {
console.error(`WebRTC error with ${peerId}:`, error.message); console.error(`WebRTC error with ${peerId.substring(0, 8)}:`, error.message);
connection.state = 'failed'; connection.state = 'failed';
this.removePeer(peerId); this.removePeer(peerId);
}); });
// Handle close // Handle close
peer.on('close', () => { peer.on('close', () => {
console.log(`WebRTC connection closed with ${peerId.substring(0, 8)}`);
connection.state = 'disconnected'; connection.state = 'disconnected';
this.removePeer(peerId); this.removePeer(peerId);
}); });
@@ -114,7 +111,7 @@ export class WebRTCManager extends EventEmitter {
connection.peer.send(JSON.stringify(message)); connection.peer.send(JSON.stringify(message));
return true; return true;
} catch (error) { } catch (error) {
console.error(`Error sending message to ${peerId}:`, error); // Silently handle send errors
return false; return false;
} }
} }
@@ -131,7 +128,7 @@ export class WebRTCManager extends EventEmitter {
connection.peer.send(JSON.stringify(message)); connection.peer.send(JSON.stringify(message));
sent++; sent++;
} catch (error) { } catch (error) {
console.error(`Error broadcasting to ${connection.peerId}:`, error); // Silently handle broadcast errors
} }
} }
} }
@@ -192,7 +189,7 @@ export class WebRTCManager extends EventEmitter {
// If it's a TURN server, it should have credentials // If it's a TURN server, it should have credentials
if (urls.some(url => url.startsWith('turn:') || url.startsWith('turns:'))) { if (urls.some(url => url.startsWith('turn:') || url.startsWith('turns:'))) {
if (!server.username || !server.credential) { if (!server.username || !server.credential) {
console.warn(`Warning: TURN server ${server.urls} missing username/credential`); // TURN server missing credentials - validation only
} }
} }
} }