From a750806ba71c0a1d66e5addbe06edc6f42551c3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Wed, 12 Jun 2024 19:38:29 +0200 Subject: [PATCH 1/6] SSE: Close connection automatically once all messages matched --- src/steps/sse.ts | 51 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index fc3587e..5f2579a 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -60,6 +60,11 @@ export default async function ( ) { const stepResult: StepRunResult = { type: 'sse', + request : { + url: params.url, + headers: params.headers, + size: 0, + } } const ssw = new co2() @@ -81,18 +86,16 @@ export default async function ( }) const messages: MessageEvent[] = [] + const expectedMessages: Set | undefined = params.check?.messages + ? new Set(params.check?.messages?.map((m) => m.id)) + : undefined - const timeout = setTimeout(() => { + // Closes the `EventSource` and exits as "passed" + function end () { ev.close() const messagesBuffer = Buffer.from(messages.map((m) => m.data).join('\n')) - stepResult.request = { - url: params.url, - headers: params.headers, - size: 0, - } - stepResult.response = { contentType: 'text/event-stream', body: messagesBuffer, @@ -103,12 +106,35 @@ export default async function ( } resolve(true) + } + + const timeout = setTimeout(() => { + console.debug(`SSE timed out`) + end() }, params.timeout || 10000) ev.onerror = (error) => { clearTimeout(timeout) + + let message: string + if (ev.readyState === EventSource.CLOSED) { + // SSE stream closed gracefully + return end() + } else if (ev.readyState === EventSource.CONNECTING) { + // SSE stream closed by the server + if (expectedMessages === undefined) { + message = 'The SSE stream was closed by the server. If this is expected behavior, please use [`tests..steps.[step].sse.check.messages`](https://docs.stepci.com/reference/workflow-syntax.html#tests-test-steps-step-sse-check-messages-message).' + } else { + message = `The SSE stream was closed by the server before all expected messages were received. Missing IDs: ${JSON.stringify([...expectedMessages], null, 2)}` + } + } else { + // SSE stream is still open (`ev.readyState === EventSource.OPEN`) + // but received an "error" event from the server + message = `The SSE stream received an error event from the server: ${JSON.stringify(error, null, 2)}` + } + ev.close() - reject(error) + reject({ ...error, message }) } if (params.check) { @@ -127,6 +153,15 @@ export default async function ( ev.onmessage = (message) => { messages.push(message) + // Mark message as received + expectedMessages?.delete(message.lastEventId) + // If all expected messages have been received, close connection and return as "passed" + if (expectedMessages?.size === 0) { + // console.debug('All expected messages received, closing connection…') + clearTimeout(timeout) + end() + } + if (params.check) { params.check.messages?.forEach((check, id) => { if (check.body) { From a90e9ddc0b3dc36a54f98a1226cc612e73713d5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Fri, 21 Jun 2024 01:02:07 +0200 Subject: [PATCH 2/6] SSE: Don't run checks destined to another event --- src/steps/sse.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index 5f2579a..64e7e72 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -164,6 +164,9 @@ export default async function ( if (params.check) { params.check.messages?.forEach((check, id) => { + // Don't run check if it's not intended for this message + if (check.id !== message.lastEventId) return + if (check.body) { const result = checkResult(message.data, check.body) if (result.passed && stepResult.checks?.messages) From 7ac5b37a80fd9e15df512281a6f8d8f03d2c3034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Fri, 21 Jun 2024 01:02:44 +0200 Subject: [PATCH 3/6] SSE: Replace useless backticks by single quotes --- src/steps/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index 64e7e72..d3f81e3 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -109,7 +109,7 @@ export default async function ( } const timeout = setTimeout(() => { - console.debug(`SSE timed out`) + console.debug('SSE timed out') end() }, params.timeout || 10000) From 3a47b891818583298c8857dd85f23d91b9fbe79a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Fri, 21 Jun 2024 01:04:17 +0200 Subject: [PATCH 4/6] SSE: Fix debug logs when `jsonpath` checks fail --- src/steps/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index d3f81e3..326f0c8 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -216,7 +216,7 @@ export default async function ( .map((c: CheckResult) => c.passed) .every((passed) => passed) - if (passed && stepResult.checks?.messages) + if (stepResult.checks?.messages) (stepResult.checks.messages as CheckResults)[check.id] = { expected: check.jsonpath, given: jsonpathResult, From 9abc08bba59af103908429650630e5b07b04fac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Fri, 21 Jun 2024 01:05:48 +0200 Subject: [PATCH 5/6] SSE: Resolve future after checks ran --- src/steps/sse.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index 326f0c8..9125e27 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -153,15 +153,6 @@ export default async function ( ev.onmessage = (message) => { messages.push(message) - // Mark message as received - expectedMessages?.delete(message.lastEventId) - // If all expected messages have been received, close connection and return as "passed" - if (expectedMessages?.size === 0) { - // console.debug('All expected messages received, closing connection…') - clearTimeout(timeout) - end() - } - if (params.check) { params.check.messages?.forEach((check, id) => { // Don't run check if it's not intended for this message @@ -228,6 +219,15 @@ export default async function ( } }) } + + // Mark message as received + expectedMessages?.delete(message.lastEventId) + // If all expected messages have been received, close connection and return as "passed" + if (expectedMessages?.size === 0) { + // console.debug('All expected messages received, closing connection…') + clearTimeout(timeout) + end() + } } }) From 76cb563671ef557cc3492977df76941d628493b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Bardon?= Date: Sat, 22 Jun 2024 19:26:57 +0200 Subject: [PATCH 6/6] SSE: Use arrow notation for inline function `end` --- src/steps/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/steps/sse.ts b/src/steps/sse.ts index 9125e27..09e141c 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -91,7 +91,7 @@ export default async function ( : undefined // Closes the `EventSource` and exits as "passed" - function end () { + const end = () => { ev.close() const messagesBuffer = Buffer.from(messages.map((m) => m.data).join('\n'))