Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE: Close connection automatically once all messages matched #121

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions src/steps/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ export default async function (
) {
const stepResult: StepRunResult = {
type: 'sse',
request : {
url: params.url,
headers: params.headers,
size: 0,
}
}

const ssw = new co2()
Expand All @@ -81,18 +86,16 @@ export default async function (
})

const messages: MessageEvent[] = []
const expectedMessages: Set<string> | undefined = params.check?.messages
? new Set(params.check?.messages?.map((m) => m.id))
: undefined

const timeout = setTimeout(() => {
// Closes the `EventSource` and exits as "passed"
const end = () => {
ev.close()

const messagesBuffer = Buffer.from(messages.map((m) => m.data).join('\n'))

stepResult.request = {
url: params.url,
headers: params.headers,
size: 0,
}

stepResult.response = {
contentType: 'text/event-stream',
body: messagesBuffer,
Expand All @@ -103,12 +106,35 @@ export default async function (
}

resolve(true)
}

const timeout = setTimeout(() => {
console.debug('SSE timed out')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No debug please, return an error if you this should be visible in the CLI

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout was considered a test pass, I didn't want to change the behavior (see #121 (comment)). How should I log this then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it times out and no message was received during the time, the test should be marked as failed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No message at all or not all expected messages if applicable? (I have my idea, I just want to follow yours)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one? 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not every expected message matched or no messages matched and a timeout occurs, the checks for them should fail - see current behaviour.

end()
}, params.timeout || 10000)

ev.onerror = (error) => {
clearTimeout(timeout)

let message: string
if (ev.readyState === EventSource.CLOSED) {
// SSE stream closed gracefully
return end()
} else if (ev.readyState === EventSource.CONNECTING) {
// SSE stream closed by the server
if (expectedMessages === undefined) {
message = 'The SSE stream was closed by the server. If this is expected behavior, please use [`tests.<test>.steps.[step].sse.check.messages`](https://docs.stepci.com/reference/workflow-syntax.html#tests-test-steps-step-sse-check-messages-message).'
} else {
message = `The SSE stream was closed by the server before all expected messages were received. Missing IDs: ${JSON.stringify([...expectedMessages], null, 2)}`
}
} else {
// SSE stream is still open (`ev.readyState === EventSource.OPEN`)
// but received an "error" event from the server
message = `The SSE stream received an error event from the server: ${JSON.stringify(error, null, 2)}`
}

ev.close()
reject(error)
reject({ ...error, message })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it look good in the CLI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, it integrates with stepci reading the message key. That's why I did it in the first place (it printed an unhelpful "undefined" before, as I stated in the first paragraph of this PR description).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I let you resolve conversations)

}

if (params.check) {
Expand All @@ -129,6 +155,9 @@ export default async function (

if (params.check) {
params.check.messages?.forEach((check, id) => {
// Don't run check if it's not intended for this message
if (check.id !== message.lastEventId) return

if (check.body) {
const result = checkResult(message.data, check.body)
if (result.passed && stepResult.checks?.messages)
Expand Down Expand Up @@ -178,7 +207,7 @@ export default async function (
.map((c: CheckResult) => c.passed)
.every((passed) => passed)

if (passed && stepResult.checks?.messages)
if (stepResult.checks?.messages)
(stepResult.checks.messages as CheckResults)[check.id] = {
expected: check.jsonpath,
given: jsonpathResult,
Expand All @@ -190,6 +219,15 @@ export default async function (
}
})
}

// Mark message as received
expectedMessages?.delete(message.lastEventId)
// If all expected messages have been received, close connection and return as "passed"
if (expectedMessages?.size === 0) {
// console.debug('All expected messages received, closing connection…')
clearTimeout(timeout)
end()
}
}
})

Expand Down