- Remove manual --position parameter from CLI - Add automatic optimal position calculation based on gap analysis - Implement dynamic ring topology management - Add position tracking and synchronization across nodes - Add topology command for visual ring structure inspection - Clean up unused variables and dead code - Simplify node setup with automatic positioning
951 líneas
34 KiB
JavaScript
951 líneas
34 KiB
JavaScript
import { EventEmitter } from 'events';
|
||
import { v4 as uuidv4 } from 'uuid';
|
||
import { WebRTCManager } from './webrtc-manager.js';
|
||
import WebSocket, { WebSocketServer } from 'ws';
|
||
import chalk from 'chalk';
|
||
|
||
export class RingNode extends EventEmitter {
|
||
constructor(options = {}) {
|
||
super();
|
||
this.id = options.id || uuidv4();
|
||
this.port = options.port || this.getRandomPort();
|
||
this.ringPosition = null; // Will be assigned automatically
|
||
this.isOracle = options.isOracle || false;
|
||
|
||
// Two rings: inner and outer
|
||
this.rings = {
|
||
inner: {
|
||
left: null, // Previous node in inner ring
|
||
right: null, // Next node in inner ring
|
||
},
|
||
outer: {
|
||
left: null, // Previous node in outer ring
|
||
right: null, // Next node in outer ring
|
||
}
|
||
};
|
||
|
||
this.webrtc = new WebRTCManager(this.id, {
|
||
iceServers: options.iceServers
|
||
});
|
||
this.discoveryServer = null;
|
||
this.knownNodes = new Map();
|
||
this.messageHistory = new Set();
|
||
this.oracleNodes = new Set();
|
||
this.activeSignalingConnections = new Map(); // Store active WebSocket connections for signaling
|
||
this.nodePositions = new Map(); // Track all node positions in the ring
|
||
this.ringSize = 1000; // Virtual ring size for position calculation
|
||
|
||
// Assign initial position for this node
|
||
this.assignPosition(this.id);
|
||
|
||
this.setupWebRTCHandlers();
|
||
this.setupDiscoveryServer();
|
||
|
||
// Start connection management
|
||
this.startConnectionManager();
|
||
this.startHeartbeat();
|
||
}
|
||
|
||
getRandomPort() {
|
||
return Math.floor(Math.random() * (65535 - 49152) + 49152);
|
||
}
|
||
|
||
setupWebRTCHandlers() {
|
||
this.webrtc.on('peerConnected', (peerId) => {
|
||
this.emit('peerConnected', peerId);
|
||
});
|
||
|
||
this.webrtc.on('peerDisconnected', (peerId) => {
|
||
this.handlePeerDisconnection(peerId);
|
||
this.emit('peerDisconnected', peerId);
|
||
});
|
||
|
||
this.webrtc.on('message', ({ from, message }) => {
|
||
this.handleMessage(from, message);
|
||
});
|
||
|
||
this.webrtc.on('signal', ({ peerId, signal }) => {
|
||
// Try to send signal through stored WebSocket connection
|
||
const signalingWs = this.activeSignalingConnections.get(peerId);
|
||
if (signalingWs && signalingWs.readyState === signalingWs.OPEN) {
|
||
signalingWs.send(JSON.stringify({
|
||
type: 'signal',
|
||
from: this.id,
|
||
to: peerId,
|
||
payload: signal
|
||
}));
|
||
} else {
|
||
// Fallback to old method
|
||
this.sendSignalingMessage(peerId, {
|
||
type: 'signal',
|
||
signal
|
||
});
|
||
}
|
||
});
|
||
}
|
||
|
||
setupDiscoveryServer() {
|
||
this.discoveryServer = new WebSocketServer({ port: this.port });
|
||
|
||
this.discoveryServer.on('connection', (ws) => {
|
||
ws.on('message', async (data) => {
|
||
try {
|
||
const message = JSON.parse(data.toString());
|
||
await this.handleSignalingMessage(ws, message);
|
||
} catch (error) {
|
||
// Silently handle signaling errors
|
||
}
|
||
});
|
||
|
||
// Clean up signaling connections when WebSocket closes
|
||
ws.on('close', () => {
|
||
// Find and remove this WebSocket from active signaling connections
|
||
for (const [peerId, storedWs] of this.activeSignalingConnections.entries()) {
|
||
if (storedWs === ws) {
|
||
this.activeSignalingConnections.delete(peerId);
|
||
break;
|
||
}
|
||
}
|
||
});
|
||
});
|
||
}
|
||
|
||
async handleSignalingMessage(ws, message) {
|
||
const { type, from, to, payload } = message;
|
||
|
||
if (to && to !== this.id) {
|
||
// Forward message to the intended recipient
|
||
return;
|
||
}
|
||
|
||
switch (type) {
|
||
case 'signal':
|
||
// Handle WebRTC signaling with simple-peer
|
||
let connection = this.webrtc.connections.get(`${this.id}-${from}`);
|
||
if (!connection) {
|
||
// Create connection as receiver
|
||
connection = await this.webrtc.createConnection(from, false);
|
||
}
|
||
await this.webrtc.handleSignal(from, payload);
|
||
break;
|
||
|
||
case 'webrtc-connect':
|
||
// Handle WebRTC connection request
|
||
console.log(chalk.cyan(`<EFBFBD> Received WebRTC connection request from ${from.substring(0, 8)}`));
|
||
|
||
// Check if we already have a connection with this peer
|
||
const existingConnection = this.webrtc.connections.get(`${this.id}-${from}`);
|
||
if (existingConnection && existingConnection.peer && existingConnection.peer.connected) {
|
||
ws.send(JSON.stringify({
|
||
type: 'webrtc-connect-response',
|
||
success: false,
|
||
reason: 'Already connected'
|
||
}));
|
||
return;
|
||
}
|
||
|
||
// Store the WebSocket connection for signaling
|
||
this.activeSignalingConnections.set(from, ws);
|
||
|
||
// Accept the connection request
|
||
ws.send(JSON.stringify({
|
||
type: 'webrtc-connect-response',
|
||
success: true
|
||
}));
|
||
|
||
// Create connection as receiver (non-initiator)
|
||
const newConnection = await this.webrtc.createConnection(from, false);
|
||
break;
|
||
|
||
case 'discovery':
|
||
ws.send(JSON.stringify({
|
||
type: 'discovery-response',
|
||
nodeId: this.id,
|
||
port: this.port,
|
||
isOracle: this.isOracle,
|
||
ringPosition: this.ringPosition,
|
||
connectedPeers: this.webrtc.getConnectedPeers()
|
||
}));
|
||
break;
|
||
|
||
case 'join-ring':
|
||
await this.handleJoinRingRequest(ws, payload);
|
||
break;
|
||
}
|
||
}
|
||
|
||
async connectToPeer(nodeId, address) {
|
||
try {
|
||
|
||
const ws = new WebSocket(`ws://${address}`);
|
||
|
||
return new Promise((resolve, reject) => {
|
||
let connection = null;
|
||
let timeoutId;
|
||
let signalHandler, connectHandler;
|
||
|
||
ws.on('open', async () => {
|
||
// Send connection request
|
||
ws.send(JSON.stringify({
|
||
type: 'webrtc-connect',
|
||
from: this.id,
|
||
to: nodeId
|
||
}));
|
||
|
||
// Create WebRTC connection as initiator
|
||
connection = await this.webrtc.createConnection(nodeId, true);
|
||
|
||
// Listen for signaling data from our peer
|
||
signalHandler = ({ peerId, signal }) => {
|
||
if (peerId === nodeId) {
|
||
ws.send(JSON.stringify({
|
||
type: 'signal',
|
||
from: this.id,
|
||
to: nodeId,
|
||
payload: signal
|
||
}));
|
||
}
|
||
};
|
||
|
||
this.webrtc.on('signal', signalHandler);
|
||
|
||
// Listen for connection success
|
||
connectHandler = (peerId) => {
|
||
if (peerId === nodeId) {
|
||
cleanup();
|
||
resolve(true);
|
||
}
|
||
};
|
||
|
||
this.webrtc.on('peerConnected', connectHandler);
|
||
});
|
||
|
||
ws.on('message', async (data) => {
|
||
try {
|
||
const message = JSON.parse(data.toString());
|
||
|
||
if (message.type === 'signal' && message.from === nodeId) {
|
||
await this.webrtc.handleSignal(nodeId, message.payload);
|
||
} else if (message.type === 'webrtc-connect-response') {
|
||
if (!message.success) {
|
||
cleanup();
|
||
resolve(false);
|
||
}
|
||
}
|
||
} catch (error) {
|
||
// Silently handle parsing errors
|
||
}
|
||
});
|
||
|
||
ws.on('error', (error) => {
|
||
cleanup();
|
||
reject(error);
|
||
});
|
||
|
||
ws.on('close', () => {
|
||
// WebSocket connection closed
|
||
});
|
||
|
||
const cleanup = () => {
|
||
if (timeoutId) clearTimeout(timeoutId);
|
||
if (signalHandler) this.webrtc.removeListener('signal', signalHandler);
|
||
if (connectHandler) this.webrtc.removeListener('peerConnected', connectHandler);
|
||
if (ws.readyState === ws.OPEN) ws.close();
|
||
};
|
||
|
||
// Set timeout for connection process
|
||
timeoutId = setTimeout(() => {
|
||
cleanup();
|
||
reject(new Error('Connection timeout'));
|
||
}, 20000); // Increased timeout to 20 seconds
|
||
});
|
||
} catch (error) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
async sendSignalingMessage(peerId, message) {
|
||
// In a real implementation, this would route through a signaling server
|
||
// For now, we'll use direct WebSocket connections
|
||
}
|
||
|
||
handleMessage(from, message) {
|
||
const { id: messageId, type, payload, route } = message;
|
||
|
||
// Prevent message loops
|
||
if (this.messageHistory.has(messageId)) {
|
||
return;
|
||
}
|
||
this.messageHistory.add(messageId);
|
||
|
||
// Clean old message history
|
||
if (this.messageHistory.size > 1000) {
|
||
const oldMessages = Array.from(this.messageHistory).slice(0, 500);
|
||
oldMessages.forEach(id => this.messageHistory.delete(id));
|
||
}
|
||
|
||
// Update last seen timestamp for the sender
|
||
const senderInfo = this.knownNodes.get(from);
|
||
if (senderInfo) {
|
||
senderInfo.lastSeen = Date.now();
|
||
this.knownNodes.set(from, senderInfo);
|
||
}
|
||
|
||
switch (type) {
|
||
case 'heartbeat':
|
||
// Heartbeat message - just update lastSeen (already done above)
|
||
break;
|
||
case 'ring-message':
|
||
this.handleRingMessage(from, payload, route, messageId);
|
||
break;
|
||
case 'oracle-query':
|
||
if (this.isOracle) {
|
||
this.handleOracleQuery(from, payload, messageId);
|
||
}
|
||
break;
|
||
case 'oracle-response':
|
||
this.emit('oracleResponse', { from, payload });
|
||
break;
|
||
case 'network-update':
|
||
this.handleNetworkUpdate(from, payload);
|
||
break;
|
||
default:
|
||
this.emit('message', { from, type, payload });
|
||
}
|
||
}
|
||
|
||
handleRingMessage(from, payload, route, messageId) {
|
||
this.emit('ringMessage', { from, payload });
|
||
|
||
// Forward message along the ring
|
||
this.forwardRingMessage({
|
||
id: messageId,
|
||
type: 'ring-message',
|
||
payload,
|
||
route: [...(route || []), this.id]
|
||
}, from);
|
||
}
|
||
|
||
forwardRingMessage(message, excludeFrom) {
|
||
const route = message.route || [];
|
||
|
||
// Forward in inner ring
|
||
if (this.rings.inner.right && this.rings.inner.right !== excludeFrom) {
|
||
this.webrtc.sendMessage(this.rings.inner.right, message);
|
||
}
|
||
|
||
// Forward in outer ring
|
||
if (this.rings.outer.right && this.rings.outer.right !== excludeFrom) {
|
||
this.webrtc.sendMessage(this.rings.outer.right, message);
|
||
}
|
||
}
|
||
|
||
sendRingMessage(payload, ring = 'both') {
|
||
const message = {
|
||
id: uuidv4(),
|
||
type: 'ring-message',
|
||
payload,
|
||
route: [this.id]
|
||
};
|
||
|
||
let sent = 0;
|
||
|
||
if (ring === 'inner' || ring === 'both') {
|
||
if (this.rings.inner.right) {
|
||
if (this.webrtc.sendMessage(this.rings.inner.right, message)) {
|
||
sent++;
|
||
}
|
||
}
|
||
}
|
||
|
||
if (ring === 'outer' || ring === 'both') {
|
||
if (this.rings.outer.right) {
|
||
if (this.webrtc.sendMessage(this.rings.outer.right, message)) {
|
||
sent++;
|
||
}
|
||
}
|
||
}
|
||
|
||
return sent > 0;
|
||
}
|
||
|
||
async joinRing(bootstrapNode) {
|
||
try {
|
||
const [host, port] = bootstrapNode.split(':');
|
||
const bootstrapAddress = `${host}:${port}`;
|
||
|
||
return new Promise((resolve, reject) => {
|
||
const ws = new WebSocket(`ws://${bootstrapAddress}`);
|
||
let timeoutId;
|
||
let bootstrapNodeId = null;
|
||
|
||
ws.on('open', () => {
|
||
// Send join ring request directly through WebSocket
|
||
ws.send(JSON.stringify({
|
||
type: 'join-ring',
|
||
from: this.id,
|
||
payload: {
|
||
nodeId: this.id,
|
||
port: this.port,
|
||
isOracle: this.isOracle
|
||
}
|
||
}));
|
||
});
|
||
|
||
ws.on('message', async (data) => {
|
||
try {
|
||
const message = JSON.parse(data.toString());
|
||
|
||
if (message.type === 'join-response') {
|
||
if (message.success) {
|
||
bootstrapNodeId = message.bootstrapNodeId;
|
||
|
||
// Update position if provided by bootstrap node
|
||
if (message.assignedPosition !== undefined) {
|
||
this.ringPosition = message.assignedPosition;
|
||
this.nodePositions.set(this.id, message.assignedPosition);
|
||
console.log(chalk.green(`📍 Assigned position ${message.assignedPosition} in the ring`));
|
||
}
|
||
|
||
// Update network topology if provided
|
||
if (message.networkTopology) {
|
||
Object.entries(message.networkTopology).forEach(([nodeId, position]) => {
|
||
this.nodePositions.set(nodeId, position);
|
||
});
|
||
this.updateRingTopology();
|
||
}
|
||
|
||
clearTimeout(timeoutId);
|
||
ws.close();
|
||
resolve(true);
|
||
} else {
|
||
clearTimeout(timeoutId);
|
||
ws.close();
|
||
resolve(false);
|
||
}
|
||
}
|
||
|
||
// Handle signaling for WebRTC connection establishment
|
||
if (message.type === 'signal' && message.from === bootstrapNodeId) {
|
||
await this.webrtc.handleSignal(bootstrapNodeId, message.payload);
|
||
}
|
||
} catch (error) {
|
||
// Silently handle parsing errors
|
||
}
|
||
});
|
||
|
||
ws.on('error', (error) => {
|
||
clearTimeout(timeoutId);
|
||
reject(error);
|
||
});
|
||
|
||
ws.on('close', () => {
|
||
clearTimeout(timeoutId);
|
||
});
|
||
|
||
// Set timeout for join process
|
||
timeoutId = setTimeout(() => {
|
||
ws.close();
|
||
reject(new Error('Connection timeout'));
|
||
}, 15000); // Increased timeout to 15 seconds
|
||
});
|
||
} catch (error) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
async handleJoinRingRequest(ws, payload) {
|
||
const { nodeId, port, isOracle } = payload;
|
||
|
||
try {
|
||
// Add to known nodes
|
||
this.knownNodes.set(nodeId, {
|
||
port,
|
||
isOracle,
|
||
persistent: false,
|
||
lastSeen: Date.now()
|
||
});
|
||
|
||
if (isOracle) {
|
||
this.oracleNodes.add(nodeId);
|
||
}
|
||
|
||
// Find optimal position for the new node
|
||
const assignedPosition = await this.integrateNewNode(nodeId);
|
||
|
||
// Send success response with position and topology info
|
||
ws.send(JSON.stringify({
|
||
type: 'join-response',
|
||
success: true,
|
||
message: 'Successfully integrated into ring network',
|
||
bootstrapNodeId: this.id,
|
||
isOracle: this.isOracle,
|
||
assignedPosition: assignedPosition,
|
||
networkTopology: Object.fromEntries(this.nodePositions)
|
||
}));
|
||
|
||
// After successful join, try to establish WebRTC connection
|
||
setTimeout(async () => {
|
||
try {
|
||
const nodeAddress = `localhost:${port}`; // Assuming localhost for now
|
||
const connection = await this.connectToPeer(nodeId, nodeAddress);
|
||
if (connection) {
|
||
// Update known node info to mark as persistent
|
||
const nodeInfo = this.knownNodes.get(nodeId);
|
||
if (nodeInfo) {
|
||
nodeInfo.persistent = true;
|
||
nodeInfo.address = nodeAddress;
|
||
this.knownNodes.set(nodeId, nodeInfo);
|
||
}
|
||
// Connection established
|
||
}
|
||
} catch (connectionError) {
|
||
// Connection failed
|
||
}
|
||
}, 3000); // Give the joining node more time to set up
|
||
|
||
} catch (error) {
|
||
// Send failure response
|
||
ws.send(JSON.stringify({
|
||
type: 'join-response',
|
||
success: false,
|
||
message: 'Failed to integrate into ring network'
|
||
}));
|
||
}
|
||
} async integrateNewNode(nodeId) {
|
||
// Automatically assign optimal position for the new node
|
||
const newPosition = this.calculateOptimalPosition();
|
||
this.nodePositions.set(nodeId, newPosition);
|
||
|
||
// Update ring topology with the new node
|
||
this.updateRingTopology();
|
||
|
||
// Notify the network of topology change
|
||
this.broadcastNetworkUpdate();
|
||
|
||
console.log(chalk.green(`🔄 Integrated node ${nodeId} at position ${newPosition}`));
|
||
|
||
return newPosition;
|
||
}
|
||
|
||
// Automatic position management methods
|
||
calculateOptimalPosition() {
|
||
// If this is the first node in the ring
|
||
if (this.nodePositions.size === 0) {
|
||
return 0;
|
||
}
|
||
|
||
// If there's only one node, place the new one opposite to it
|
||
if (this.nodePositions.size === 1) {
|
||
return Math.floor(this.ringSize / 2);
|
||
}
|
||
|
||
// Get all current positions and sort them
|
||
const positions = Array.from(this.nodePositions.values()).sort((a, b) => a - b);
|
||
|
||
// Find the largest gap between consecutive positions
|
||
let largestGap = 0;
|
||
let optimalPosition = 0;
|
||
|
||
for (let i = 0; i < positions.length; i++) {
|
||
const currentPos = positions[i];
|
||
const nextPos = positions[(i + 1) % positions.length];
|
||
|
||
// Calculate gap (considering ring wraparound)
|
||
let gap;
|
||
if (nextPos > currentPos) {
|
||
gap = nextPos - currentPos;
|
||
} else {
|
||
// Wraparound case
|
||
gap = (this.ringSize - currentPos) + nextPos;
|
||
}
|
||
|
||
if (gap > largestGap) {
|
||
largestGap = gap;
|
||
// Place new node in the middle of the largest gap
|
||
optimalPosition = (currentPos + gap / 2) % this.ringSize;
|
||
}
|
||
}
|
||
|
||
return Math.floor(optimalPosition);
|
||
}
|
||
|
||
assignPosition(nodeId, position = null) {
|
||
if (position === null) {
|
||
position = this.calculateOptimalPosition();
|
||
}
|
||
|
||
this.nodePositions.set(nodeId, position);
|
||
|
||
if (nodeId === this.id) {
|
||
this.ringPosition = position;
|
||
}
|
||
|
||
// Update ring topology based on new positions
|
||
this.updateRingTopology();
|
||
|
||
return position;
|
||
}
|
||
|
||
updateRingTopology() {
|
||
// Sort all nodes by their positions
|
||
const sortedNodes = Array.from(this.nodePositions.entries())
|
||
.sort((a, b) => a[1] - b[1]); // Sort by position
|
||
|
||
// Clear current ring connections
|
||
this.rings.inner.left = null;
|
||
this.rings.inner.right = null;
|
||
this.rings.outer.left = null;
|
||
this.rings.outer.right = null;
|
||
|
||
if (sortedNodes.length <= 1) {
|
||
return; // Not enough nodes for a ring
|
||
}
|
||
|
||
// Find this node's index in the sorted list
|
||
const thisNodeIndex = sortedNodes.findIndex(([nodeId]) => nodeId === this.id);
|
||
|
||
if (thisNodeIndex === -1) {
|
||
return; // This node not found in positions
|
||
}
|
||
|
||
const nodeCount = sortedNodes.length;
|
||
|
||
// Calculate neighbors in inner ring (clockwise)
|
||
const innerRightIndex = (thisNodeIndex + 1) % nodeCount;
|
||
const innerLeftIndex = (thisNodeIndex - 1 + nodeCount) % nodeCount;
|
||
|
||
this.rings.inner.right = sortedNodes[innerRightIndex][0];
|
||
this.rings.inner.left = sortedNodes[innerLeftIndex][0];
|
||
|
||
// Calculate neighbors in outer ring (counter-clockwise)
|
||
this.rings.outer.right = sortedNodes[innerLeftIndex][0];
|
||
this.rings.outer.left = sortedNodes[innerRightIndex][0];
|
||
}
|
||
|
||
removeNodePosition(nodeId) {
|
||
this.nodePositions.delete(nodeId);
|
||
this.updateRingTopology();
|
||
}
|
||
|
||
rebalanceRing() {
|
||
// Redistribute all nodes evenly around the ring
|
||
const nodeIds = Array.from(this.nodePositions.keys());
|
||
const nodeCount = nodeIds.length;
|
||
|
||
if (nodeCount === 0) return;
|
||
|
||
const step = this.ringSize / nodeCount;
|
||
|
||
nodeIds.forEach((nodeId, index) => {
|
||
const newPosition = Math.floor(index * step);
|
||
this.nodePositions.set(nodeId, newPosition);
|
||
|
||
if (nodeId === this.id) {
|
||
this.ringPosition = newPosition;
|
||
}
|
||
});
|
||
|
||
this.updateRingTopology();
|
||
}
|
||
|
||
handlePeerDisconnection(peerId) {
|
||
// Remove node position and update topology
|
||
this.removeNodePosition(peerId);
|
||
|
||
this.knownNodes.delete(peerId);
|
||
this.oracleNodes.delete(peerId);
|
||
|
||
console.log(chalk.yellow(`🔄 Node ${peerId} disconnected, ring topology updated`));
|
||
|
||
this.broadcastNetworkUpdate();
|
||
}
|
||
|
||
broadcastNetworkUpdate() {
|
||
const message = {
|
||
id: uuidv4(),
|
||
type: 'network-update',
|
||
payload: {
|
||
nodeId: this.id,
|
||
rings: this.rings,
|
||
knownNodes: Array.from(this.knownNodes.keys()),
|
||
oracleNodes: Array.from(this.oracleNodes),
|
||
nodePositions: Object.fromEntries(this.nodePositions),
|
||
ringPosition: this.ringPosition
|
||
}
|
||
};
|
||
|
||
this.webrtc.broadcast(message);
|
||
}
|
||
|
||
handleNetworkUpdate(from, payload) {
|
||
const { rings, knownNodes, oracleNodes, nodePositions, ringPosition } = payload;
|
||
|
||
// Update our knowledge of network topology
|
||
knownNodes.forEach(nodeId => {
|
||
if (!this.knownNodes.has(nodeId)) {
|
||
this.knownNodes.set(nodeId, {});
|
||
}
|
||
});
|
||
|
||
oracleNodes.forEach(nodeId => {
|
||
this.oracleNodes.add(nodeId);
|
||
});
|
||
|
||
// Update node positions if provided
|
||
if (nodePositions) {
|
||
// Merge position information from other nodes
|
||
Object.entries(nodePositions).forEach(([nodeId, position]) => {
|
||
if (!this.nodePositions.has(nodeId)) {
|
||
this.nodePositions.set(nodeId, position);
|
||
}
|
||
});
|
||
|
||
// Update the sender's position
|
||
if (ringPosition !== undefined) {
|
||
this.nodePositions.set(from, ringPosition);
|
||
}
|
||
|
||
// Recalculate our ring topology based on updated positions
|
||
this.updateRingTopology();
|
||
}
|
||
|
||
this.emit('networkUpdate', { from, payload });
|
||
}
|
||
|
||
// Oracle-specific methods
|
||
handleOracleQuery(from, payload, messageId) {
|
||
// Process oracle query and send response
|
||
const response = this.processOracleQuery(payload);
|
||
|
||
const responseMessage = {
|
||
id: uuidv4(),
|
||
type: 'oracle-response',
|
||
payload: {
|
||
queryId: messageId,
|
||
response,
|
||
timestamp: Date.now()
|
||
}
|
||
};
|
||
|
||
this.webrtc.sendMessage(from, responseMessage);
|
||
}
|
||
|
||
processOracleQuery(payload) {
|
||
const { query, data } = payload;
|
||
|
||
// Simple oracle responses - extend this for your specific use case
|
||
switch (query) {
|
||
case 'network-status':
|
||
return {
|
||
connectedPeers: this.webrtc.getConnectedPeers().length,
|
||
knownNodes: this.knownNodes.size,
|
||
oracleNodes: this.oracleNodes.size,
|
||
rings: this.rings
|
||
};
|
||
case 'timestamp':
|
||
return { timestamp: Date.now() };
|
||
case 'echo':
|
||
return { echo: data };
|
||
default:
|
||
return { error: 'Unknown query type' };
|
||
}
|
||
}
|
||
|
||
queryOracle(query, data = null) {
|
||
const oracleList = Array.from(this.oracleNodes);
|
||
if (oracleList.length === 0) {
|
||
return Promise.reject(new Error('No oracle nodes available'));
|
||
}
|
||
|
||
// Query first available oracle
|
||
const oracleId = oracleList[0];
|
||
const message = {
|
||
id: uuidv4(),
|
||
type: 'oracle-query',
|
||
payload: { query, data }
|
||
};
|
||
|
||
return new Promise((resolve, reject) => {
|
||
const timeout = setTimeout(() => {
|
||
reject(new Error('Oracle query timeout'));
|
||
}, 5000);
|
||
|
||
const responseHandler = ({ from, payload }) => {
|
||
if (from === oracleId) {
|
||
clearTimeout(timeout);
|
||
this.removeListener('oracleResponse', responseHandler);
|
||
resolve(payload.response);
|
||
}
|
||
};
|
||
|
||
this.on('oracleResponse', responseHandler);
|
||
|
||
if (!this.webrtc.sendMessage(oracleId, message)) {
|
||
clearTimeout(timeout);
|
||
this.removeListener('oracleResponse', responseHandler);
|
||
reject(new Error('Failed to send oracle query'));
|
||
}
|
||
});
|
||
}
|
||
|
||
getNetworkInfo() {
|
||
return {
|
||
nodeId: this.id,
|
||
port: this.port,
|
||
isOracle: this.isOracle,
|
||
ringPosition: this.ringPosition,
|
||
rings: this.rings,
|
||
connectedPeers: this.webrtc.getConnectedPeers(),
|
||
knownNodes: Array.from(this.knownNodes.keys()),
|
||
oracleNodes: Array.from(this.oracleNodes),
|
||
nodePositions: Object.fromEntries(this.nodePositions),
|
||
ringTopology: this.getRingTopology()
|
||
};
|
||
}
|
||
|
||
async destroy() {
|
||
// Clean up intervals
|
||
if (this.connectionCheckInterval) {
|
||
clearInterval(this.connectionCheckInterval);
|
||
}
|
||
if (this.heartbeatInterval) {
|
||
clearInterval(this.heartbeatInterval);
|
||
}
|
||
|
||
// Notify peers of shutdown
|
||
this.webrtc.broadcast({
|
||
id: uuidv4(),
|
||
type: 'node-shutdown',
|
||
payload: { nodeId: this.id }
|
||
});
|
||
|
||
// Clean up connections
|
||
this.webrtc.destroy();
|
||
|
||
if (this.discoveryServer) {
|
||
this.discoveryServer.close();
|
||
}
|
||
|
||
this.removeAllListeners();
|
||
}
|
||
|
||
startConnectionManager() {
|
||
// Start periodic connection health check and maintenance
|
||
this.connectionCheckInterval = setInterval(() => {
|
||
this.maintainPersistentConnections();
|
||
}, 30000); // Check every 30 seconds
|
||
}
|
||
|
||
async maintainPersistentConnections() {
|
||
const now = Date.now();
|
||
const staleThreshold = 120000; // 2 minutes
|
||
|
||
// Check each known node
|
||
for (const [nodeId, nodeInfo] of this.knownNodes.entries()) {
|
||
if (nodeInfo.persistent) {
|
||
const connectionState = this.webrtc.getConnectionState(nodeId);
|
||
const lastSeen = nodeInfo.lastSeen || 0;
|
||
|
||
// If connection is lost or stale, try to reconnect
|
||
if (connectionState !== 'connected' || (now - lastSeen) > staleThreshold) {
|
||
try {
|
||
const connection = await this.connectToPeer(nodeId, nodeInfo.address);
|
||
if (connection) {
|
||
nodeInfo.lastSeen = now;
|
||
this.knownNodes.set(nodeId, nodeInfo);
|
||
}
|
||
} catch (error) {
|
||
// If we can't reconnect after multiple attempts, mark as non-persistent
|
||
if (!nodeInfo.reconnectAttempts) nodeInfo.reconnectAttempts = 0;
|
||
nodeInfo.reconnectAttempts++;
|
||
|
||
if (nodeInfo.reconnectAttempts >= 3) {
|
||
nodeInfo.persistent = false;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
sendHeartbeat() {
|
||
// Send heartbeat to all connected peers
|
||
const heartbeatMessage = {
|
||
id: uuidv4(),
|
||
type: 'heartbeat',
|
||
from: this.id,
|
||
timestamp: Date.now()
|
||
};
|
||
|
||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||
if (connectedPeers.length > 0) {
|
||
this.webrtc.broadcast(heartbeatMessage);
|
||
}
|
||
}
|
||
|
||
startHeartbeat() {
|
||
// Send heartbeat every 30 seconds
|
||
this.heartbeatInterval = setInterval(() => {
|
||
this.sendHeartbeat();
|
||
}, 30000);
|
||
}
|
||
|
||
getPersistentConnections() {
|
||
const persistentConnections = [];
|
||
for (const [nodeId, nodeInfo] of this.knownNodes.entries()) {
|
||
if (nodeInfo.persistent) {
|
||
const connectionState = this.webrtc.getConnectionState(nodeId);
|
||
persistentConnections.push({
|
||
nodeId: nodeId,
|
||
address: nodeInfo.address,
|
||
isOracle: nodeInfo.isOracle,
|
||
connectionState: connectionState,
|
||
lastSeen: nodeInfo.lastSeen,
|
||
reconnectAttempts: nodeInfo.reconnectAttempts || 0
|
||
});
|
||
}
|
||
}
|
||
return persistentConnections;
|
||
}
|
||
|
||
getConnectionStatus() {
|
||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||
const persistentConnections = this.getPersistentConnections();
|
||
|
||
return {
|
||
totalConnections: connectedPeers.length,
|
||
persistentConnections: persistentConnections.length,
|
||
activeConnections: persistentConnections.filter(conn => conn.connectionState === 'connected').length,
|
||
knownNodes: this.knownNodes.size,
|
||
oracleNodes: this.oracleNodes.size
|
||
};
|
||
}
|
||
|
||
getRingTopology() {
|
||
// Create a visual representation of the ring topology
|
||
const sortedNodes = Array.from(this.nodePositions.entries())
|
||
.sort((a, b) => a[1] - b[1]); // Sort by position
|
||
|
||
return {
|
||
totalNodes: sortedNodes.length,
|
||
ringSize: this.ringSize,
|
||
nodes: sortedNodes.map(([nodeId, position]) => ({
|
||
nodeId: nodeId.substring(0, 8), // Short version for display
|
||
position: position,
|
||
isThisNode: nodeId === this.id,
|
||
isOracle: this.oracleNodes.has(nodeId),
|
||
isConnected: this.webrtc.getConnectedPeers().includes(nodeId)
|
||
})),
|
||
innerRing: {
|
||
left: this.rings.inner.left?.substring(0, 8),
|
||
right: this.rings.inner.right?.substring(0, 8)
|
||
},
|
||
outerRing: {
|
||
left: this.rings.outer.left?.substring(0, 8),
|
||
right: this.rings.outer.right?.substring(0, 8)
|
||
}
|
||
};
|
||
}
|
||
}
|