Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE: Close connection automatically once all messages matched #121

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions src/steps/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ export default async function (
) {
const stepResult: StepRunResult = {
type: 'sse',
request : {
url: params.url,
headers: params.headers,
size: 0,
}
}

const ssw = new co2()
Expand All @@ -81,18 +86,16 @@ export default async function (
})

const messages: MessageEvent[] = []
const expectedMessages: Set<string> | undefined = params.check?.messages
? new Set(params.check?.messages?.map((m) => m.id))
: undefined

const timeout = setTimeout(() => {
// Closes the `EventSource` and exits as "passed"
function end () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use arrow functions for inline functions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to const end = () => {, is it what you meant?

ev.close()

const messagesBuffer = Buffer.from(messages.map((m) => m.data).join('\n'))

stepResult.request = {
url: params.url,
headers: params.headers,
size: 0,
}

stepResult.response = {
contentType: 'text/event-stream',
body: messagesBuffer,
Expand All @@ -103,12 +106,35 @@ export default async function (
}

resolve(true)
}

const timeout = setTimeout(() => {
console.debug(`SSE timed out`)
Copy link
Member Author

@RemiBardon RemiBardon Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a debug log here, as I personally don't think it makes sense to mark the step as "passed" if the SSE timed out (which was the case in the current code, so I didn't change it). To make sure people notice this, I think it's acceptable to leave a warning. Maybe you'd like to use some logging utilities Step CI uses instead of console.debug, but I haven't found them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the quotes for single quotes but left the log.

end()
}, params.timeout || 10000)

ev.onerror = (error) => {
clearTimeout(timeout)

let message: string
if (ev.readyState === EventSource.CLOSED) {
// SSE stream closed gracefully
return end()
} else if (ev.readyState === EventSource.CONNECTING) {
// SSE stream closed by the server
if (expectedMessages === undefined) {
message = 'The SSE stream was closed by the server. If this is expected behavior, please use [`tests.<test>.steps.[step].sse.check.messages`](https://docs.stepci.com/reference/workflow-syntax.html#tests-test-steps-step-sse-check-messages-message).'
} else {
message = `The SSE stream was closed by the server before all expected messages were received. Missing IDs: ${JSON.stringify([...expectedMessages], null, 2)}`
}
} else {
// SSE stream is still open (`ev.readyState === EventSource.OPEN`)
// but received an "error" event from the server
message = `The SSE stream received an error event from the server: ${JSON.stringify(error, null, 2)}`
}

ev.close()
reject(error)
reject({ ...error, message })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it look good in the CLI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, it integrates with stepci reading the message key. That's why I did it in the first place (it printed an unhelpful "undefined" before, as I stated in the first paragraph of this PR description).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I let you resolve conversations)

}

if (params.check) {
Expand All @@ -127,6 +153,15 @@ export default async function (
ev.onmessage = (message) => {
messages.push(message)

// Mark message as received
expectedMessages?.delete(message.lastEventId)
// If all expected messages have been received, close connection and return as "passed"
if (expectedMessages?.size === 0) {
// console.debug('All expected messages received, closing connection…')
clearTimeout(timeout)
end()
}

if (params.check) {
params.check.messages?.forEach((check, id) => {
if (check.body) {
Expand Down