584
src/oracle-node.js
Archivo normal
584
src/oracle-node.js
Archivo normal
@@ -0,0 +1,584 @@
|
||||
import { RingNode } from './ring-node.js';
|
||||
import chalk from 'chalk';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
export class OracleNode extends RingNode {
|
||||
constructor(options = {}) {
|
||||
super({
|
||||
...options,
|
||||
isOracle: true
|
||||
});
|
||||
|
||||
this.oracleServices = new Map();
|
||||
this.dataStore = new Map();
|
||||
this.consensusData = new Map();
|
||||
this.networkMetrics = {
|
||||
startTime: Date.now(),
|
||||
messagesProcessed: 0,
|
||||
queriesAnswered: 0,
|
||||
networkEvents: []
|
||||
};
|
||||
|
||||
this.initializeOracleServices();
|
||||
console.log(chalk.yellow(`🔮✨ Oracle Node ${this.id.substring(0, 8)} initialized with enhanced capabilities`));
|
||||
}
|
||||
|
||||
initializeOracleServices() {
|
||||
// Register oracle services
|
||||
this.oracleServices.set('network-analysis', this.analyzeNetwork.bind(this));
|
||||
this.oracleServices.set('data-storage', this.handleDataStorage.bind(this));
|
||||
this.oracleServices.set('consensus', this.handleConsensus.bind(this));
|
||||
this.oracleServices.set('routing', this.handleRouting.bind(this));
|
||||
this.oracleServices.set('health-check', this.performHealthCheck.bind(this));
|
||||
this.oracleServices.set('network-metrics', this.getNetworkMetrics.bind(this));
|
||||
|
||||
// Start periodic tasks
|
||||
this.startPeriodicTasks();
|
||||
}
|
||||
|
||||
startPeriodicTasks() {
|
||||
// Network health monitoring
|
||||
setInterval(() => {
|
||||
this.monitorNetworkHealth();
|
||||
}, 30000); // Every 30 seconds
|
||||
|
||||
// Metrics collection
|
||||
setInterval(() => {
|
||||
this.collectMetrics();
|
||||
}, 10000); // Every 10 seconds
|
||||
|
||||
// Cleanup old data
|
||||
setInterval(() => {
|
||||
this.cleanupOldData();
|
||||
}, 300000); // Every 5 minutes
|
||||
}
|
||||
|
||||
processOracleQuery(payload) {
|
||||
const { query, data, service } = payload;
|
||||
this.networkMetrics.queriesAnswered++;
|
||||
|
||||
console.log(chalk.cyan(`🔮 Processing oracle query: ${service || query}`));
|
||||
|
||||
if (service && this.oracleServices.has(service)) {
|
||||
return this.oracleServices.get(service)(data);
|
||||
}
|
||||
|
||||
// Fallback to parent class handling
|
||||
return super.processOracleQuery(payload);
|
||||
}
|
||||
|
||||
analyzeNetwork(data) {
|
||||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||||
const analysis = {
|
||||
networkSize: this.knownNodes.size + 1, // +1 for this node
|
||||
connectedPeers: connectedPeers.length,
|
||||
oracleNodes: this.oracleNodes.size,
|
||||
ringTopology: {
|
||||
inner: {
|
||||
hasLeft: !!this.rings.inner.left,
|
||||
hasRight: !!this.rings.inner.right,
|
||||
complete: !!this.rings.inner.left && !!this.rings.inner.right
|
||||
},
|
||||
outer: {
|
||||
hasLeft: !!this.rings.outer.left,
|
||||
hasRight: !!this.rings.outer.right,
|
||||
complete: !!this.rings.outer.left && !!this.rings.outer.right
|
||||
}
|
||||
},
|
||||
networkHealth: this.calculateNetworkHealth(),
|
||||
recommendations: this.generateNetworkRecommendations()
|
||||
};
|
||||
|
||||
return analysis;
|
||||
}
|
||||
|
||||
calculateNetworkHealth() {
|
||||
const connectedPeers = this.webrtc.getConnectedPeers().length;
|
||||
const expectedConnections = 4; // 2 for each ring (left and right)
|
||||
const connectionHealth = Math.min(connectedPeers / expectedConnections, 1.0);
|
||||
|
||||
const ringHealth = {
|
||||
inner: (this.rings.inner.left ? 0.5 : 0) + (this.rings.inner.right ? 0.5 : 0),
|
||||
outer: (this.rings.outer.left ? 0.5 : 0) + (this.rings.outer.right ? 0.5 : 0)
|
||||
};
|
||||
|
||||
const overallHealth = (connectionHealth + ringHealth.inner + ringHealth.outer) / 3;
|
||||
|
||||
return {
|
||||
overall: Math.round(overallHealth * 100),
|
||||
connections: Math.round(connectionHealth * 100),
|
||||
innerRing: Math.round(ringHealth.inner * 100),
|
||||
outerRing: Math.round(ringHealth.outer * 100)
|
||||
};
|
||||
}
|
||||
|
||||
generateNetworkRecommendations() {
|
||||
const recommendations = [];
|
||||
const health = this.calculateNetworkHealth();
|
||||
|
||||
if (health.overall < 70) {
|
||||
recommendations.push('Network health is low - consider adding more nodes');
|
||||
}
|
||||
|
||||
if (!this.rings.inner.left || !this.rings.inner.right) {
|
||||
recommendations.push('Inner ring is incomplete - seeking connections');
|
||||
}
|
||||
|
||||
if (!this.rings.outer.left || !this.rings.outer.right) {
|
||||
recommendations.push('Outer ring is incomplete - seeking connections');
|
||||
}
|
||||
|
||||
if (this.oracleNodes.size < 2) {
|
||||
recommendations.push('Consider adding more oracle nodes for redundancy');
|
||||
}
|
||||
|
||||
return recommendations;
|
||||
}
|
||||
|
||||
handleDataStorage(data) {
|
||||
const { operation, key, value, ttl } = data;
|
||||
|
||||
switch (operation) {
|
||||
case 'set':
|
||||
const entry = {
|
||||
value,
|
||||
timestamp: Date.now(),
|
||||
ttl: ttl || 3600000, // Default 1 hour TTL
|
||||
nodeId: this.id
|
||||
};
|
||||
this.dataStore.set(key, entry);
|
||||
|
||||
// Replicate to other oracles for redundancy
|
||||
this.replicateData(key, entry);
|
||||
|
||||
return { success: true, key, timestamp: entry.timestamp };
|
||||
|
||||
case 'get':
|
||||
const storedEntry = this.dataStore.get(key);
|
||||
if (!storedEntry) {
|
||||
return { success: false, error: 'Key not found' };
|
||||
}
|
||||
|
||||
// Check TTL
|
||||
if (Date.now() - storedEntry.timestamp > storedEntry.ttl) {
|
||||
this.dataStore.delete(key);
|
||||
return { success: false, error: 'Key expired' };
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
key,
|
||||
value: storedEntry.value,
|
||||
timestamp: storedEntry.timestamp
|
||||
};
|
||||
|
||||
case 'delete':
|
||||
const deleted = this.dataStore.delete(key);
|
||||
if (deleted) {
|
||||
this.replicateDataDeletion(key);
|
||||
}
|
||||
return { success: deleted };
|
||||
|
||||
case 'list':
|
||||
const keys = Array.from(this.dataStore.keys());
|
||||
return { success: true, keys, count: keys.length };
|
||||
|
||||
default:
|
||||
return { success: false, error: 'Unknown operation' };
|
||||
}
|
||||
}
|
||||
|
||||
replicateData(key, entry) {
|
||||
const replicationMessage = {
|
||||
id: uuidv4(),
|
||||
type: 'data-replication',
|
||||
payload: {
|
||||
operation: 'set',
|
||||
key,
|
||||
entry
|
||||
}
|
||||
};
|
||||
|
||||
// Send to other oracle nodes
|
||||
this.oracleNodes.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
this.webrtc.sendMessage(oracleId, replicationMessage);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
replicateDataDeletion(key) {
|
||||
const replicationMessage = {
|
||||
id: uuidv4(),
|
||||
type: 'data-replication',
|
||||
payload: {
|
||||
operation: 'delete',
|
||||
key
|
||||
}
|
||||
};
|
||||
|
||||
this.oracleNodes.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
this.webrtc.sendMessage(oracleId, replicationMessage);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
handleConsensus(data) {
|
||||
const { proposalId, proposal, vote } = data;
|
||||
|
||||
if (proposal && !vote) {
|
||||
// New proposal
|
||||
this.consensusData.set(proposalId, {
|
||||
proposal,
|
||||
votes: new Map(),
|
||||
timestamp: Date.now(),
|
||||
status: 'active'
|
||||
});
|
||||
|
||||
// Broadcast proposal to other oracles
|
||||
this.broadcastConsensusProposal(proposalId, proposal);
|
||||
|
||||
return { success: true, proposalId, status: 'proposal_created' };
|
||||
}
|
||||
|
||||
if (vote && proposalId) {
|
||||
// Vote on existing proposal
|
||||
const consensusItem = this.consensusData.get(proposalId);
|
||||
if (!consensusItem) {
|
||||
return { success: false, error: 'Proposal not found' };
|
||||
}
|
||||
|
||||
consensusItem.votes.set(this.id, vote);
|
||||
|
||||
// Broadcast vote
|
||||
this.broadcastConsensusVote(proposalId, vote);
|
||||
|
||||
// Check if consensus reached
|
||||
const result = this.checkConsensus(proposalId);
|
||||
return { success: true, proposalId, vote, consensus: result };
|
||||
}
|
||||
|
||||
return { success: false, error: 'Invalid consensus request' };
|
||||
}
|
||||
|
||||
broadcastConsensusProposal(proposalId, proposal) {
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'consensus-proposal',
|
||||
payload: { proposalId, proposal }
|
||||
};
|
||||
|
||||
this.oracleNodes.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
this.webrtc.sendMessage(oracleId, message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
broadcastConsensusVote(proposalId, vote) {
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'consensus-vote',
|
||||
payload: { proposalId, vote, voterId: this.id }
|
||||
};
|
||||
|
||||
this.oracleNodes.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
this.webrtc.sendMessage(oracleId, message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
checkConsensus(proposalId) {
|
||||
const consensusItem = this.consensusData.get(proposalId);
|
||||
if (!consensusItem) return null;
|
||||
|
||||
const totalOracles = this.oracleNodes.size + 1; // +1 for this node
|
||||
const votesNeeded = Math.floor(totalOracles * 0.6) + 1; // 60% majority
|
||||
|
||||
const votes = Array.from(consensusItem.votes.values());
|
||||
const yesVotes = votes.filter(vote => vote === 'yes').length;
|
||||
const noVotes = votes.filter(vote => vote === 'no').length;
|
||||
|
||||
if (yesVotes >= votesNeeded) {
|
||||
consensusItem.status = 'approved';
|
||||
return { status: 'approved', votes: { yes: yesVotes, no: noVotes, total: votes.length } };
|
||||
} else if (noVotes >= votesNeeded) {
|
||||
consensusItem.status = 'rejected';
|
||||
return { status: 'rejected', votes: { yes: yesVotes, no: noVotes, total: votes.length } };
|
||||
}
|
||||
|
||||
return { status: 'pending', votes: { yes: yesVotes, no: noVotes, total: votes.length } };
|
||||
}
|
||||
|
||||
handleRouting(data) {
|
||||
const { destination, message, strategy } = data;
|
||||
|
||||
switch (strategy) {
|
||||
case 'shortest-path':
|
||||
return this.findShortestPath(destination);
|
||||
case 'ring-flood':
|
||||
return this.performRingFlood(message);
|
||||
case 'oracle-route':
|
||||
return this.routeThroughOracles(destination, message);
|
||||
default:
|
||||
return { success: false, error: 'Unknown routing strategy' };
|
||||
}
|
||||
}
|
||||
|
||||
findShortestPath(destination) {
|
||||
// Simple shortest path - in a real implementation, this would use
|
||||
// algorithms like Dijkstra's or A*
|
||||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||||
|
||||
if (connectedPeers.includes(destination)) {
|
||||
return {
|
||||
success: true,
|
||||
path: [this.id, destination],
|
||||
hops: 1
|
||||
};
|
||||
}
|
||||
|
||||
// Check if any connected peer can reach destination
|
||||
// This is a simplified version - real implementation would maintain
|
||||
// routing tables and use network topology information
|
||||
return {
|
||||
success: false,
|
||||
error: 'Destination not reachable',
|
||||
knownPeers: connectedPeers
|
||||
};
|
||||
}
|
||||
|
||||
performRingFlood(message) {
|
||||
const flooded = this.sendRingMessage(message, 'both');
|
||||
return {
|
||||
success: flooded,
|
||||
strategy: 'ring-flood',
|
||||
timestamp: Date.now()
|
||||
};
|
||||
}
|
||||
|
||||
routeThroughOracles(destination, message) {
|
||||
const oracleList = Array.from(this.oracleNodes);
|
||||
let routedCount = 0;
|
||||
|
||||
oracleList.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
const routingMessage = {
|
||||
id: uuidv4(),
|
||||
type: 'oracle-routing',
|
||||
payload: { destination, message }
|
||||
};
|
||||
|
||||
if (this.webrtc.sendMessage(oracleId, routingMessage)) {
|
||||
routedCount++;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
success: routedCount > 0,
|
||||
oraclesContacted: routedCount,
|
||||
strategy: 'oracle-route'
|
||||
};
|
||||
}
|
||||
|
||||
performHealthCheck(data) {
|
||||
const checks = {
|
||||
webrtc: this.webrtc.getConnectedPeers().length > 0,
|
||||
rings: {
|
||||
inner: !!this.rings.inner.left || !!this.rings.inner.right,
|
||||
outer: !!this.rings.outer.left || !!this.rings.outer.right
|
||||
},
|
||||
dataStore: this.dataStore.size >= 0,
|
||||
memory: process.memoryUsage(),
|
||||
uptime: Date.now() - this.networkMetrics.startTime,
|
||||
services: Array.from(this.oracleServices.keys())
|
||||
};
|
||||
|
||||
const isHealthy = checks.webrtc && (checks.rings.inner || checks.rings.outer);
|
||||
|
||||
return {
|
||||
healthy: isHealthy,
|
||||
checks,
|
||||
timestamp: Date.now(),
|
||||
nodeId: this.id
|
||||
};
|
||||
}
|
||||
|
||||
getNetworkMetrics() {
|
||||
return {
|
||||
...this.networkMetrics,
|
||||
dataStoreSize: this.dataStore.size,
|
||||
consensusItems: this.consensusData.size,
|
||||
connectedPeers: this.webrtc.getConnectedPeers().length,
|
||||
knownNodes: this.knownNodes.size,
|
||||
oracleNodes: this.oracleNodes.size,
|
||||
memoryUsage: process.memoryUsage(),
|
||||
uptime: Date.now() - this.networkMetrics.startTime
|
||||
};
|
||||
}
|
||||
|
||||
monitorNetworkHealth() {
|
||||
const health = this.performHealthCheck();
|
||||
this.networkMetrics.networkEvents.push({
|
||||
type: 'health-check',
|
||||
timestamp: Date.now(),
|
||||
data: health
|
||||
});
|
||||
|
||||
if (!health.healthy) {
|
||||
console.log(chalk.red(`⚠️ Network health issue detected`));
|
||||
this.handleNetworkHealthIssue(health);
|
||||
}
|
||||
}
|
||||
|
||||
handleNetworkHealthIssue(health) {
|
||||
// Attempt to reconnect to peers
|
||||
if (!health.checks.webrtc) {
|
||||
console.log(chalk.yellow('🔄 Attempting to reconnect to network...'));
|
||||
// Implement reconnection logic
|
||||
}
|
||||
|
||||
// Notify other oracles of health issues
|
||||
const alertMessage = {
|
||||
id: uuidv4(),
|
||||
type: 'health-alert',
|
||||
payload: {
|
||||
nodeId: this.id,
|
||||
healthStatus: health,
|
||||
timestamp: Date.now()
|
||||
}
|
||||
};
|
||||
|
||||
this.oracleNodes.forEach(oracleId => {
|
||||
if (oracleId !== this.id) {
|
||||
this.webrtc.sendMessage(oracleId, alertMessage);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
collectMetrics() {
|
||||
this.networkMetrics.messagesProcessed++;
|
||||
|
||||
// Keep only recent events (last 1000)
|
||||
if (this.networkMetrics.networkEvents.length > 1000) {
|
||||
this.networkMetrics.networkEvents = this.networkMetrics.networkEvents.slice(-1000);
|
||||
}
|
||||
}
|
||||
|
||||
cleanupOldData() {
|
||||
const now = Date.now();
|
||||
|
||||
// Clean expired data store entries
|
||||
for (const [key, entry] of this.dataStore) {
|
||||
if (now - entry.timestamp > entry.ttl) {
|
||||
this.dataStore.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Clean old consensus data (older than 24 hours)
|
||||
for (const [proposalId, item] of this.consensusData) {
|
||||
if (now - item.timestamp > 86400000) { // 24 hours
|
||||
this.consensusData.delete(proposalId);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(chalk.blue(`🧹 Cleaned up old data. DataStore: ${this.dataStore.size}, Consensus: ${this.consensusData.size}`));
|
||||
}
|
||||
|
||||
// Handle additional message types specific to Oracle
|
||||
handleMessage(from, message) {
|
||||
const { type, payload } = message;
|
||||
|
||||
switch (type) {
|
||||
case 'data-replication':
|
||||
this.handleDataReplication(from, payload);
|
||||
break;
|
||||
case 'consensus-proposal':
|
||||
this.handleConsensusProposal(from, payload);
|
||||
break;
|
||||
case 'consensus-vote':
|
||||
this.handleConsensusVote(from, payload);
|
||||
break;
|
||||
case 'health-alert':
|
||||
this.handleHealthAlert(from, payload);
|
||||
break;
|
||||
case 'oracle-routing':
|
||||
this.handleOracleRouting(from, payload);
|
||||
break;
|
||||
default:
|
||||
super.handleMessage(from, message);
|
||||
}
|
||||
}
|
||||
|
||||
handleDataReplication(from, payload) {
|
||||
const { operation, key, entry } = payload;
|
||||
|
||||
if (operation === 'set') {
|
||||
this.dataStore.set(key, entry);
|
||||
console.log(chalk.blue(`📥 Replicated data: ${key}`));
|
||||
} else if (operation === 'delete') {
|
||||
this.dataStore.delete(key);
|
||||
console.log(chalk.blue(`🗑️ Deleted replicated data: ${key}`));
|
||||
}
|
||||
}
|
||||
|
||||
handleConsensusProposal(from, payload) {
|
||||
const { proposalId, proposal } = payload;
|
||||
|
||||
this.consensusData.set(proposalId, {
|
||||
proposal,
|
||||
votes: new Map(),
|
||||
timestamp: Date.now(),
|
||||
status: 'active',
|
||||
proposer: from
|
||||
});
|
||||
|
||||
console.log(chalk.cyan(`📋 New consensus proposal ${proposalId} from ${from.substring(0, 8)}`));
|
||||
}
|
||||
|
||||
handleConsensusVote(from, payload) {
|
||||
const { proposalId, vote, voterId } = payload;
|
||||
const consensusItem = this.consensusData.get(proposalId);
|
||||
|
||||
if (consensusItem) {
|
||||
consensusItem.votes.set(voterId, vote);
|
||||
const result = this.checkConsensus(proposalId);
|
||||
|
||||
console.log(chalk.cyan(`🗳️ Vote received for ${proposalId}: ${vote} (${result.status})`));
|
||||
}
|
||||
}
|
||||
|
||||
handleHealthAlert(from, payload) {
|
||||
console.log(chalk.red(`🚨 Health alert from ${from.substring(0, 8)}`));
|
||||
this.networkMetrics.networkEvents.push({
|
||||
type: 'health-alert',
|
||||
timestamp: Date.now(),
|
||||
from,
|
||||
data: payload
|
||||
});
|
||||
}
|
||||
|
||||
handleOracleRouting(from, payload) {
|
||||
const { destination, message } = payload;
|
||||
|
||||
// Attempt to route the message to the destination
|
||||
if (this.webrtc.sendMessage(destination, message)) {
|
||||
console.log(chalk.green(`📬 Routed message to ${destination.substring(0, 8)}`));
|
||||
} else {
|
||||
console.log(chalk.yellow(`📪 Unable to route message to ${destination.substring(0, 8)}`));
|
||||
}
|
||||
}
|
||||
|
||||
getOracleInfo() {
|
||||
return {
|
||||
...this.getNetworkInfo(),
|
||||
oracleServices: Array.from(this.oracleServices.keys()),
|
||||
dataStoreSize: this.dataStore.size,
|
||||
consensusItems: this.consensusData.size,
|
||||
metrics: this.getNetworkMetrics()
|
||||
};
|
||||
}
|
||||
}
|
||||
508
src/ring-node.js
Archivo normal
508
src/ring-node.js
Archivo normal
@@ -0,0 +1,508 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { WebRTCManager } from './webrtc-manager.js';
|
||||
import WebSocket, { WebSocketServer } from 'ws';
|
||||
import chalk from 'chalk';
|
||||
|
||||
export class RingNode extends EventEmitter {
|
||||
constructor(options = {}) {
|
||||
super();
|
||||
this.id = options.id || uuidv4();
|
||||
this.port = options.port || this.getRandomPort();
|
||||
this.ringPosition = options.ringPosition || 0;
|
||||
this.isOracle = options.isOracle || false;
|
||||
|
||||
// Two rings: inner and outer
|
||||
this.rings = {
|
||||
inner: {
|
||||
left: null, // Previous node in inner ring
|
||||
right: null, // Next node in inner ring
|
||||
},
|
||||
outer: {
|
||||
left: null, // Previous node in outer ring
|
||||
right: null, // Next node in outer ring
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc = new WebRTCManager(this.id);
|
||||
this.discoveryServer = null;
|
||||
this.knownNodes = new Map();
|
||||
this.messageHistory = new Set();
|
||||
this.oracleNodes = new Set();
|
||||
|
||||
this.setupWebRTCHandlers();
|
||||
this.setupDiscoveryServer();
|
||||
|
||||
console.log(chalk.green(`🔗 Ring Node ${this.id.substring(0, 8)} initialized on port ${this.port}`));
|
||||
if (this.isOracle) {
|
||||
console.log(chalk.yellow(`🔮 Oracle mode enabled`));
|
||||
}
|
||||
}
|
||||
|
||||
getRandomPort() {
|
||||
return Math.floor(Math.random() * (65535 - 49152) + 49152);
|
||||
}
|
||||
|
||||
setupWebRTCHandlers() {
|
||||
this.webrtc.on('peerConnected', (peerId) => {
|
||||
console.log(chalk.blue(`✅ Connected to peer: ${peerId.substring(0, 8)}`));
|
||||
this.emit('peerConnected', peerId);
|
||||
});
|
||||
|
||||
this.webrtc.on('peerDisconnected', (peerId) => {
|
||||
console.log(chalk.red(`❌ Disconnected from peer: ${peerId.substring(0, 8)}`));
|
||||
this.handlePeerDisconnection(peerId);
|
||||
this.emit('peerDisconnected', peerId);
|
||||
});
|
||||
|
||||
this.webrtc.on('message', ({ from, message }) => {
|
||||
this.handleMessage(from, message);
|
||||
});
|
||||
|
||||
this.webrtc.on('signal', ({ peerId, signal }) => {
|
||||
this.sendSignalingMessage(peerId, {
|
||||
type: 'signal',
|
||||
signal
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
setupDiscoveryServer() {
|
||||
this.discoveryServer = new WebSocketServer({ port: this.port });
|
||||
|
||||
this.discoveryServer.on('connection', (ws) => {
|
||||
ws.on('message', async (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
await this.handleSignalingMessage(ws, message);
|
||||
} catch (error) {
|
||||
console.error('Error handling signaling message:', error);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
console.log(chalk.cyan(`🌐 Discovery server listening on port ${this.port}`));
|
||||
}
|
||||
|
||||
async handleSignalingMessage(ws, message) {
|
||||
const { type, from, to, payload } = message;
|
||||
|
||||
if (to && to !== this.id) {
|
||||
// Forward message to the intended recipient
|
||||
return;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case 'signal':
|
||||
// Handle WebRTC signaling with simple-peer
|
||||
let connection = this.webrtc.connections.get(`${this.id}-${from}`);
|
||||
if (!connection) {
|
||||
// Create connection as receiver
|
||||
connection = await this.webrtc.createConnection(from, false);
|
||||
}
|
||||
await this.webrtc.handleSignal(from, payload);
|
||||
break;
|
||||
|
||||
case 'discovery':
|
||||
ws.send(JSON.stringify({
|
||||
type: 'discovery-response',
|
||||
nodeId: this.id,
|
||||
port: this.port,
|
||||
isOracle: this.isOracle,
|
||||
ringPosition: this.ringPosition,
|
||||
connectedPeers: this.webrtc.getConnectedPeers()
|
||||
}));
|
||||
break;
|
||||
|
||||
case 'join-ring':
|
||||
await this.handleJoinRingRequest(ws, payload);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async connectToPeer(nodeId, address) {
|
||||
try {
|
||||
const ws = new WebSocket(`ws://${address}`);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
ws.on('open', async () => {
|
||||
// Send discovery message
|
||||
ws.send(JSON.stringify({
|
||||
type: 'discovery',
|
||||
from: this.id
|
||||
}));
|
||||
|
||||
// Create WebRTC connection as initiator
|
||||
const connection = await this.webrtc.createConnection(nodeId, true);
|
||||
|
||||
// Listen for signaling data from our peer
|
||||
const signalHandler = ({ peerId, signal }) => {
|
||||
if (peerId === nodeId) {
|
||||
ws.send(JSON.stringify({
|
||||
type: 'signal',
|
||||
from: this.id,
|
||||
to: nodeId,
|
||||
payload: signal
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.on('signal', signalHandler);
|
||||
|
||||
// Listen for connection success
|
||||
const connectHandler = (peerId) => {
|
||||
if (peerId === nodeId) {
|
||||
this.webrtc.removeListener('signal', signalHandler);
|
||||
this.webrtc.removeListener('peerConnected', connectHandler);
|
||||
ws.close();
|
||||
resolve(true);
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.on('peerConnected', connectHandler);
|
||||
});
|
||||
|
||||
ws.on('message', async (data) => {
|
||||
const message = JSON.parse(data.toString());
|
||||
|
||||
if (message.type === 'signal' && message.from === nodeId) {
|
||||
await this.webrtc.handleSignal(nodeId, message.payload);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('error', reject);
|
||||
|
||||
setTimeout(() => reject(new Error('Connection timeout')), 10000);
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(`Failed to connect to peer ${nodeId}:`, error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async sendSignalingMessage(peerId, message) {
|
||||
// In a real implementation, this would route through a signaling server
|
||||
// For now, we'll use direct WebSocket connections
|
||||
}
|
||||
|
||||
handleMessage(from, message) {
|
||||
const { id: messageId, type, payload, route } = message;
|
||||
|
||||
// Prevent message loops
|
||||
if (this.messageHistory.has(messageId)) {
|
||||
return;
|
||||
}
|
||||
this.messageHistory.add(messageId);
|
||||
|
||||
// Clean old message history
|
||||
if (this.messageHistory.size > 1000) {
|
||||
const oldMessages = Array.from(this.messageHistory).slice(0, 500);
|
||||
oldMessages.forEach(id => this.messageHistory.delete(id));
|
||||
}
|
||||
|
||||
console.log(chalk.magenta(`📨 Received ${type} from ${from.substring(0, 8)}`));
|
||||
|
||||
switch (type) {
|
||||
case 'ring-message':
|
||||
this.handleRingMessage(from, payload, route, messageId);
|
||||
break;
|
||||
case 'oracle-query':
|
||||
if (this.isOracle) {
|
||||
this.handleOracleQuery(from, payload, messageId);
|
||||
}
|
||||
break;
|
||||
case 'oracle-response':
|
||||
this.emit('oracleResponse', { from, payload });
|
||||
break;
|
||||
case 'network-update':
|
||||
this.handleNetworkUpdate(from, payload);
|
||||
break;
|
||||
default:
|
||||
this.emit('message', { from, type, payload });
|
||||
}
|
||||
}
|
||||
|
||||
handleRingMessage(from, payload, route, messageId) {
|
||||
this.emit('ringMessage', { from, payload });
|
||||
|
||||
// Forward message along the ring
|
||||
this.forwardRingMessage({
|
||||
id: messageId,
|
||||
type: 'ring-message',
|
||||
payload,
|
||||
route: [...(route || []), this.id]
|
||||
}, from);
|
||||
}
|
||||
|
||||
forwardRingMessage(message, excludeFrom) {
|
||||
const route = message.route || [];
|
||||
|
||||
// Forward in inner ring
|
||||
if (this.rings.inner.right && this.rings.inner.right !== excludeFrom) {
|
||||
this.webrtc.sendMessage(this.rings.inner.right, message);
|
||||
}
|
||||
|
||||
// Forward in outer ring
|
||||
if (this.rings.outer.right && this.rings.outer.right !== excludeFrom) {
|
||||
this.webrtc.sendMessage(this.rings.outer.right, message);
|
||||
}
|
||||
}
|
||||
|
||||
sendRingMessage(payload, ring = 'both') {
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'ring-message',
|
||||
payload,
|
||||
route: [this.id]
|
||||
};
|
||||
|
||||
let sent = 0;
|
||||
|
||||
if (ring === 'inner' || ring === 'both') {
|
||||
if (this.rings.inner.right) {
|
||||
if (this.webrtc.sendMessage(this.rings.inner.right, message)) {
|
||||
sent++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ring === 'outer' || ring === 'both') {
|
||||
if (this.rings.outer.right) {
|
||||
if (this.webrtc.sendMessage(this.rings.outer.right, message)) {
|
||||
sent++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.log(chalk.green(`📤 Sent ring message to ${sent} peers`));
|
||||
return sent > 0;
|
||||
}
|
||||
|
||||
async joinRing(bootstrapNode) {
|
||||
try {
|
||||
console.log(chalk.yellow(`🔄 Attempting to join ring via ${bootstrapNode}`));
|
||||
|
||||
const success = await this.connectToPeer('bootstrap', bootstrapNode);
|
||||
if (success) {
|
||||
// Request to join the ring network
|
||||
this.sendJoinRingRequest();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
} catch (error) {
|
||||
console.error('Failed to join ring:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
sendJoinRingRequest() {
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'join-ring',
|
||||
from: this.id,
|
||||
payload: {
|
||||
nodeId: this.id,
|
||||
port: this.port,
|
||||
isOracle: this.isOracle
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.broadcast(message);
|
||||
}
|
||||
|
||||
async handleJoinRingRequest(ws, payload) {
|
||||
const { nodeId, port, isOracle } = payload;
|
||||
|
||||
console.log(chalk.cyan(`🔗 New node ${nodeId.substring(0, 8)} wants to join the ring`));
|
||||
|
||||
// Add to known nodes
|
||||
this.knownNodes.set(nodeId, { port, isOracle });
|
||||
|
||||
if (isOracle) {
|
||||
this.oracleNodes.add(nodeId);
|
||||
}
|
||||
|
||||
// Find optimal position in rings for the new node
|
||||
await this.integrateNewNode(nodeId);
|
||||
}
|
||||
|
||||
async integrateNewNode(nodeId) {
|
||||
// Simple integration: connect as right neighbor in both rings
|
||||
// In a production system, this would be more sophisticated
|
||||
|
||||
const oldInnerRight = this.rings.inner.right;
|
||||
const oldOuterRight = this.rings.outer.right;
|
||||
|
||||
// Update ring connections
|
||||
this.rings.inner.right = nodeId;
|
||||
this.rings.outer.right = nodeId;
|
||||
|
||||
// Notify the network of topology change
|
||||
this.broadcastNetworkUpdate();
|
||||
|
||||
console.log(chalk.green(`✅ Integrated node ${nodeId.substring(0, 8)} into rings`));
|
||||
}
|
||||
|
||||
handlePeerDisconnection(peerId) {
|
||||
// Update ring topology when a peer disconnects
|
||||
if (this.rings.inner.left === peerId) {
|
||||
this.rings.inner.left = null;
|
||||
}
|
||||
if (this.rings.inner.right === peerId) {
|
||||
this.rings.inner.right = null;
|
||||
}
|
||||
if (this.rings.outer.left === peerId) {
|
||||
this.rings.outer.left = null;
|
||||
}
|
||||
if (this.rings.outer.right === peerId) {
|
||||
this.rings.outer.right = null;
|
||||
}
|
||||
|
||||
this.knownNodes.delete(peerId);
|
||||
this.oracleNodes.delete(peerId);
|
||||
|
||||
this.broadcastNetworkUpdate();
|
||||
}
|
||||
|
||||
broadcastNetworkUpdate() {
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'network-update',
|
||||
payload: {
|
||||
nodeId: this.id,
|
||||
rings: this.rings,
|
||||
knownNodes: Array.from(this.knownNodes.keys()),
|
||||
oracleNodes: Array.from(this.oracleNodes)
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.broadcast(message);
|
||||
}
|
||||
|
||||
handleNetworkUpdate(from, payload) {
|
||||
const { rings, knownNodes, oracleNodes } = payload;
|
||||
|
||||
// Update our knowledge of network topology
|
||||
knownNodes.forEach(nodeId => {
|
||||
if (!this.knownNodes.has(nodeId)) {
|
||||
this.knownNodes.set(nodeId, {});
|
||||
}
|
||||
});
|
||||
|
||||
oracleNodes.forEach(nodeId => {
|
||||
this.oracleNodes.add(nodeId);
|
||||
});
|
||||
|
||||
this.emit('networkUpdate', { from, payload });
|
||||
}
|
||||
|
||||
// Oracle-specific methods
|
||||
handleOracleQuery(from, payload, messageId) {
|
||||
console.log(chalk.yellow(`🔮 Oracle query from ${from.substring(0, 8)}: ${payload.query}`));
|
||||
|
||||
// Process oracle query and send response
|
||||
const response = this.processOracleQuery(payload);
|
||||
|
||||
const responseMessage = {
|
||||
id: uuidv4(),
|
||||
type: 'oracle-response',
|
||||
payload: {
|
||||
queryId: messageId,
|
||||
response,
|
||||
timestamp: Date.now()
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.sendMessage(from, responseMessage);
|
||||
}
|
||||
|
||||
processOracleQuery(payload) {
|
||||
const { query, data } = payload;
|
||||
|
||||
// Simple oracle responses - extend this for your specific use case
|
||||
switch (query) {
|
||||
case 'network-status':
|
||||
return {
|
||||
connectedPeers: this.webrtc.getConnectedPeers().length,
|
||||
knownNodes: this.knownNodes.size,
|
||||
oracleNodes: this.oracleNodes.size,
|
||||
rings: this.rings
|
||||
};
|
||||
case 'timestamp':
|
||||
return { timestamp: Date.now() };
|
||||
case 'echo':
|
||||
return { echo: data };
|
||||
default:
|
||||
return { error: 'Unknown query type' };
|
||||
}
|
||||
}
|
||||
|
||||
queryOracle(query, data = null) {
|
||||
const oracleList = Array.from(this.oracleNodes);
|
||||
if (oracleList.length === 0) {
|
||||
return Promise.reject(new Error('No oracle nodes available'));
|
||||
}
|
||||
|
||||
// Query first available oracle
|
||||
const oracleId = oracleList[0];
|
||||
const message = {
|
||||
id: uuidv4(),
|
||||
type: 'oracle-query',
|
||||
payload: { query, data }
|
||||
};
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error('Oracle query timeout'));
|
||||
}, 5000);
|
||||
|
||||
const responseHandler = ({ from, payload }) => {
|
||||
if (from === oracleId) {
|
||||
clearTimeout(timeout);
|
||||
this.removeListener('oracleResponse', responseHandler);
|
||||
resolve(payload.response);
|
||||
}
|
||||
};
|
||||
|
||||
this.on('oracleResponse', responseHandler);
|
||||
|
||||
if (!this.webrtc.sendMessage(oracleId, message)) {
|
||||
clearTimeout(timeout);
|
||||
this.removeListener('oracleResponse', responseHandler);
|
||||
reject(new Error('Failed to send oracle query'));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
getNetworkInfo() {
|
||||
return {
|
||||
nodeId: this.id,
|
||||
port: this.port,
|
||||
isOracle: this.isOracle,
|
||||
rings: this.rings,
|
||||
connectedPeers: this.webrtc.getConnectedPeers(),
|
||||
knownNodes: Array.from(this.knownNodes.keys()),
|
||||
oracleNodes: Array.from(this.oracleNodes)
|
||||
};
|
||||
}
|
||||
|
||||
async destroy() {
|
||||
console.log(chalk.red(`🛑 Shutting down node ${this.id.substring(0, 8)}`));
|
||||
|
||||
// Notify peers of shutdown
|
||||
this.webrtc.broadcast({
|
||||
id: uuidv4(),
|
||||
type: 'node-shutdown',
|
||||
payload: { nodeId: this.id }
|
||||
});
|
||||
|
||||
// Clean up connections
|
||||
this.webrtc.destroy();
|
||||
|
||||
if (this.discoveryServer) {
|
||||
this.discoveryServer.close();
|
||||
}
|
||||
|
||||
this.removeAllListeners();
|
||||
}
|
||||
}
|
||||
160
src/webrtc-manager.js
Archivo normal
160
src/webrtc-manager.js
Archivo normal
@@ -0,0 +1,160 @@
|
||||
import SimplePeer from 'simple-peer';
|
||||
import { EventEmitter } from 'events';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
export class WebRTCManager extends EventEmitter {
|
||||
constructor(nodeId) {
|
||||
super();
|
||||
this.nodeId = nodeId;
|
||||
this.connections = new Map();
|
||||
this.pendingConnections = new Map();
|
||||
}
|
||||
|
||||
async createConnection(peerId, isInitiator = false) {
|
||||
const connectionId = `${this.nodeId}-${peerId}`;
|
||||
|
||||
if (this.connections.has(connectionId)) {
|
||||
return this.connections.get(connectionId);
|
||||
}
|
||||
|
||||
const peer = new SimplePeer({
|
||||
initiator: isInitiator,
|
||||
trickle: false,
|
||||
config: {
|
||||
iceServers: [
|
||||
{ urls: 'stun:stun.l.google.com:19302' },
|
||||
{ urls: 'stun:stun1.l.google.com:19302' }
|
||||
]
|
||||
}
|
||||
});
|
||||
|
||||
const connection = {
|
||||
id: connectionId,
|
||||
peerId,
|
||||
peer,
|
||||
isInitiator,
|
||||
state: 'connecting'
|
||||
};
|
||||
|
||||
// Handle signaling data
|
||||
peer.on('signal', (data) => {
|
||||
this.emit('signal', {
|
||||
peerId,
|
||||
signal: data
|
||||
});
|
||||
});
|
||||
|
||||
// Handle connection
|
||||
peer.on('connect', () => {
|
||||
connection.state = 'connected';
|
||||
console.log(`WebRTC connection established with ${peerId.substring(0, 8)}`);
|
||||
this.emit('peerConnected', peerId);
|
||||
});
|
||||
|
||||
// Handle data
|
||||
peer.on('data', (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
this.emit('message', {
|
||||
from: peerId,
|
||||
message
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error parsing message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
// Handle errors
|
||||
peer.on('error', (error) => {
|
||||
console.error(`WebRTC error with ${peerId}:`, error.message);
|
||||
connection.state = 'failed';
|
||||
this.removePeer(peerId);
|
||||
});
|
||||
|
||||
// Handle close
|
||||
peer.on('close', () => {
|
||||
console.log(`WebRTC connection closed with ${peerId.substring(0, 8)}`);
|
||||
connection.state = 'disconnected';
|
||||
this.removePeer(peerId);
|
||||
});
|
||||
|
||||
this.connections.set(connectionId, connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
async handleSignal(peerId, signal) {
|
||||
const connectionId = `${this.nodeId}-${peerId}`;
|
||||
const connection = this.connections.get(connectionId);
|
||||
|
||||
if (connection && connection.peer) {
|
||||
connection.peer.signal(signal);
|
||||
}
|
||||
}
|
||||
|
||||
sendMessage(peerId, message) {
|
||||
const connectionId = `${this.nodeId}-${peerId}`;
|
||||
const connection = this.connections.get(connectionId);
|
||||
|
||||
if (connection && connection.peer && connection.peer.connected) {
|
||||
try {
|
||||
connection.peer.send(JSON.stringify(message));
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(`Error sending message to ${peerId}:`, error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
broadcast(message, excludePeers = []) {
|
||||
let sent = 0;
|
||||
for (const [connectionId, connection] of this.connections) {
|
||||
if (!excludePeers.includes(connection.peerId) &&
|
||||
connection.peer &&
|
||||
connection.peer.connected) {
|
||||
try {
|
||||
connection.peer.send(JSON.stringify(message));
|
||||
sent++;
|
||||
} catch (error) {
|
||||
console.error(`Error broadcasting to ${connection.peerId}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
return sent;
|
||||
}
|
||||
|
||||
removePeer(peerId) {
|
||||
const connectionId = `${this.nodeId}-${peerId}`;
|
||||
const connection = this.connections.get(connectionId);
|
||||
|
||||
if (connection) {
|
||||
if (connection.peer) {
|
||||
connection.peer.destroy();
|
||||
}
|
||||
this.connections.delete(connectionId);
|
||||
this.emit('peerDisconnected', peerId);
|
||||
}
|
||||
}
|
||||
|
||||
getConnectedPeers() {
|
||||
return Array.from(this.connections.values())
|
||||
.filter(conn => conn.peer && conn.peer.connected)
|
||||
.map(conn => conn.peerId);
|
||||
}
|
||||
|
||||
getConnectionState(peerId) {
|
||||
const connectionId = `${this.nodeId}-${peerId}`;
|
||||
const connection = this.connections.get(connectionId);
|
||||
return connection ? connection.state : 'disconnected';
|
||||
}
|
||||
|
||||
destroy() {
|
||||
for (const [connectionId, connection] of this.connections) {
|
||||
if (connection.peer) {
|
||||
connection.peer.destroy();
|
||||
}
|
||||
}
|
||||
this.connections.clear();
|
||||
}
|
||||
}
|
||||
Referencia en una nueva incidencia
Block a user