@@ -32,65 +32,106 @@ class StreamParser extends EventEmitter {
|
||||
}
|
||||
|
||||
parse() {
|
||||
// Find complete stanzas in buffer
|
||||
let startIdx = 0;
|
||||
let inTag = false;
|
||||
let tagDepth = 0;
|
||||
if (!this.buffer.length) return;
|
||||
|
||||
for (let i = 0; i < this.buffer.length; i++) {
|
||||
const char = this.buffer[i];
|
||||
// Handle stream start first (only once)
|
||||
if (!this.streamStarted) {
|
||||
// Look for stream:stream tag anywhere in buffer (skip XML declaration)
|
||||
const streamMatch = this.buffer.match(/<stream:stream[^>]*>/);
|
||||
if (!streamMatch) {
|
||||
// Stream opening tag not complete yet
|
||||
return;
|
||||
}
|
||||
|
||||
if (char === '<') {
|
||||
if (this.buffer.substring(i, i + 2) === '</') {
|
||||
// Closing tag
|
||||
tagDepth--;
|
||||
} else if (this.buffer[i + 1] !== '!' && this.buffer[i + 1] !== '?') {
|
||||
// Opening tag
|
||||
if (!this.streamStarted && this.buffer.substring(i).match(/^<stream:stream/)) {
|
||||
// Parse stream start
|
||||
const streamEndIdx = this.buffer.indexOf('>', i);
|
||||
if (streamEndIdx !== -1) {
|
||||
const streamTag = this.buffer.substring(i, streamEndIdx + 1);
|
||||
try {
|
||||
const attrs = this.parseAttributes(streamTag);
|
||||
this.streamStarted = true;
|
||||
this.emit('streamStart', attrs);
|
||||
this.buffer = this.buffer.substring(streamEndIdx + 1);
|
||||
i = -1;
|
||||
startIdx = 0;
|
||||
} catch (e) {
|
||||
this.emit('error', e);
|
||||
}
|
||||
const streamTag = streamMatch[0];
|
||||
const streamIndex = this.buffer.indexOf(streamTag);
|
||||
|
||||
try {
|
||||
const attrs = this.parseAttributes(streamTag);
|
||||
this.streamStarted = true;
|
||||
this.emit('streamStart', attrs);
|
||||
// Keep everything after the stream tag
|
||||
this.buffer = this.buffer.substring(streamIndex + streamTag.length);
|
||||
// Recursively parse any stanzas that follow
|
||||
this.parse();
|
||||
} catch (e) {
|
||||
this.emit('error', new Error(`Stream parse error: ${e.message}`));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse stanzas (after stream is started)
|
||||
let index = 0;
|
||||
|
||||
while (index < this.buffer.length) {
|
||||
// Skip whitespace
|
||||
while (index < this.buffer.length && /\s/.test(this.buffer[index])) {
|
||||
index++;
|
||||
}
|
||||
|
||||
if (index >= this.buffer.length) break;
|
||||
|
||||
if (this.buffer[index] === '<') {
|
||||
// Check for stream closing
|
||||
if (this.buffer.substring(index).startsWith('</stream:stream>')) {
|
||||
this.emit('streamEnd');
|
||||
this.buffer = '';
|
||||
return;
|
||||
}
|
||||
|
||||
// Try to extract a complete stanza
|
||||
let closeIndex = -1;
|
||||
let depth = 0;
|
||||
let inOpenTag = true;
|
||||
|
||||
for (let i = index; i < this.buffer.length; i++) {
|
||||
const char = this.buffer[i];
|
||||
|
||||
if (char === '<') {
|
||||
// Check if it's a closing tag
|
||||
if (this.buffer[i + 1] === '/') {
|
||||
depth--;
|
||||
} else if (this.buffer[i + 1] !== '!' && this.buffer[i + 1] !== '?') {
|
||||
depth++;
|
||||
}
|
||||
} else {
|
||||
tagDepth++;
|
||||
inOpenTag = true;
|
||||
} else if (char === '>' && inOpenTag) {
|
||||
// Check if self-closing
|
||||
if (this.buffer[i - 1] === '/') {
|
||||
depth--;
|
||||
}
|
||||
|
||||
if (depth === 0 && this.buffer[index + 1] !== '/') {
|
||||
// Found end of stanza
|
||||
closeIndex = i;
|
||||
break;
|
||||
}
|
||||
inOpenTag = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (tagDepth > 0) {
|
||||
inTag = true;
|
||||
if (closeIndex === -1) {
|
||||
// Stanza not complete, wait for more data
|
||||
break;
|
||||
}
|
||||
} else if (char === '>' && inTag) {
|
||||
tagDepth--;
|
||||
|
||||
if (tagDepth === 0) {
|
||||
// Complete stanza
|
||||
const stanzaStr = this.buffer.substring(startIdx, i + 1);
|
||||
if (stanzaStr.trim()) {
|
||||
try {
|
||||
const stanza = ltx.parse(stanzaStr);
|
||||
this.emit('stanza', stanza);
|
||||
} catch (e) {
|
||||
this.emit('error', e);
|
||||
}
|
||||
}
|
||||
startIdx = i + 1;
|
||||
inTag = false;
|
||||
// Extract stanza
|
||||
const stanzaStr = this.buffer.substring(index, closeIndex + 1);
|
||||
try {
|
||||
const stanza = ltx.parse(stanzaStr);
|
||||
this.emit('stanza', stanza);
|
||||
index = closeIndex + 1;
|
||||
} catch (e) {
|
||||
// Invalid stanza, skip this byte
|
||||
index++;
|
||||
}
|
||||
} else {
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
this.buffer = this.buffer.substring(startIdx);
|
||||
// Keep unparsed data in buffer
|
||||
this.buffer = this.buffer.substring(index);
|
||||
}
|
||||
|
||||
parseAttributes(tagStr) {
|
||||
@@ -190,7 +231,7 @@ class XMPPStream extends EventEmitter {
|
||||
}
|
||||
|
||||
handleStreamStart(attrs) {
|
||||
this.logger.debug('Stream start received:', attrs);
|
||||
this.logger.info('Stream start received:', attrs);
|
||||
|
||||
if (this.state === 'wait-for-stream') {
|
||||
// Send stream response
|
||||
|
||||
Referencia en una nueva incidencia
Block a user