Skip to content

Commit

Permalink
Error responses will now reject the sendMessage promise instead of re…
Browse files Browse the repository at this point in the history
…solving them.

Additionally, promises are now more aggressive about rejecting on unexpected errors, to avoid situations where promises can await forever.
  • Loading branch information
jlivak committed Jul 3, 2024
1 parent 644e313 commit bdf51f5
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 40 deletions.
37 changes: 25 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Full Documentation: https://inkarnaterpg.github.io/phoenix-websocket/
- Allows defining multiple message handler functions that are called depending on the content of received messages.
- Opinionated in assuming the server will send pre-defined messages with variable payloads.
- Only supports the WebSocket protocol, does not have a LongPoll fallback.
- Does not include built in Presence support
- Does not include built in Presence support.
- Configurable timeout which can help avoid flooding the dev console with connection errors.
- Built with TypeScript and includes definitions without a second dependency.

Expand Down Expand Up @@ -101,14 +101,19 @@ socket.subscribeToTopic("exampleTopic", {userId: "1"},

### Handling Error Responses to Topic Join Requests

If the server responds to a join request with an 'error 'status, the topic will not be added to your PhoenixWebsocket instance and the promise will be rejected with the payload of the error response. An example of handling a join error conditionally based on the message payload is:
If the server responds to a join request with an 'error' status, the topic will not be added to your PhoenixWebsocket instance and the promise will be rejected with a `PhoenixRespondedWithError` error object. An example of handling a join error conditionally based on the message payload is:

```typescript
socket.subscribeToTopic("exampleTopic", {userId: "1"}).catch((payload) => {
if (payload.errorMsg === "Invalid User") {
// Do something
}
socket.subscribeToTopic("exampleTopic", {userId: "1"}).catch((error) => {
if (error instanceof PhoenixRespondedWithError) {
if (error.reply.response.reason === "Invalid User") {
// Do something
}
}
else {
console.error(error)
}
}
)
```

Expand All @@ -129,17 +134,15 @@ socket.sendMessage("exampleTopic", "send_message", {message: "Hello!", attachmen
To handle a message response:
```typescript
socket.sendMessage("exampleTopic", "query_users").then((reply) => {
if (reply.status === 'ok') {
this.userList = [...reply.response.users]
}
this.userList = [...reply.response.users]
})
```

### Handling Errors With `.sendMessage()`

It's worth noting that the promise returned by `.sendMessage()` will resolve regardless of if the server responded with an 'ok' or 'error' status since we consider any response a valid response.
The promise returned by `.sendMessage()` will only resolve if the server responds with a successful ("ok") response. If the server returns an error response, then the promise will be rejected with a `PhoenixRespondedWithError` error. This error type contains the response body in case you need to implement further logic on error.

The `.sendMessage()` function may throw either a `PhoenixInvalidTopicError` or a `PhoenixInvalidStateError` if you try to send a message to a topic you aren't subscribed to, or if the websocket is not currently connected. These types are exported and can be used to catch these library-specific errors and handle them. For ex:
The `.sendMessage()` function may also throw either a `PhoenixInvalidTopicError` or a `PhoenixInvalidStateError` if you try to send a message to a topic you aren't subscribed to, or if the websocket is not currently connected. These types are exported and can be used to catch these library-specific errors and handle them. For ex:
```typescript
try {
socket.sendMessage("topicImNotSubscribedTo", "test")
Expand All @@ -154,7 +157,17 @@ catch (error) {
}
```

## Using in a Node Context

This library was primarily intended for use within a browser, so it relies on the browser WebSocket API. If you want to use this library in a node context, you will need to expose a node implementation of the WebSocket API, which can be done with the `ws`package.

After installing `ws`, add the following to the top of your entrypoint:
```typescript
import WebSocket from "ws"
(global as any).WebSocket ??= WebSocket
```

## Maintainer
This library is maintained by [Inkarnate](https://github.com/inkarnaterpg).

In addition, this library is currently in use by us in a high-traffic, production environment. While we strive to keep this library up-to-date and bug free, there may be use cases not used by us with unknown bugs. As such, Issue reports and PRs are most welcome.
In addition, this library is currently in use by us in a high-traffic, production environment. While we strive to keep this library up-to-date and bug free, there may be use cases we have not encountered with unknown bugs. As such, Issue reports and PRs are most welcome.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"type": "module",
"scripts": {
"build": "rollup -c",
"docs": "typedoc --entryPointStrategy resolve ./src/phoenix-websocket.ts --out docs/"
"docs": "typedoc --entryPointStrategy resolve ./src/phoenix-websocket.ts --out docs/",
"typecheck": "tsc --noEmit"
},
"repository": {
"type": "git",
Expand Down
95 changes: 71 additions & 24 deletions src/phoenix-websocket.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { PhoenixInvalidStateError, PhoenixInvalidTopicError } from './types/errors'
import {
PhoenixInvalidStateError,
PhoenixInvalidTopicError,
PhoenixRespondedWithError,
PhoenixInternalServerError,
PhoenixConnectionError,
PhoenixDisconnectedError,
} from './types/errors'
import { PhoenixTopic, TopicMessageHandler, TopicStatuses } from './types/topic'
import { WebsocketStatuses } from './types/websocket-statuses'
import { PhoenixMessage } from './types/message'
import { PhoenixReply } from './types/reply'
import { PhoenixOkReply, PhoenixReply } from './types/reply'
import { PhoenixWebsocketLogLevels } from './types/log-levels'
import { ReplyQueueEntry } from './types/reply-queue-entry'

/**
* Represents a connection instance to a Phoenix Sockets endpoint.
Expand Down Expand Up @@ -36,11 +44,8 @@ export class PhoenixWebsocket {
public get connectionStatus(): WebsocketStatuses {
return this._connectionStatus
}
private replyQueue: Map<string, (reply: PhoenixReply) => void> = new Map<
string,
(reply: PhoenixReply) => void
>()
private heartbeatTimeout: number | undefined
private heartbeatReplyQueue: Map<string, ReplyQueueEntry> = new Map<string, ReplyQueueEntry>()
private reconnectionTimeout: number | undefined

private onConnectedResolvers: (() => void)[] = []
Expand Down Expand Up @@ -104,7 +109,7 @@ export class PhoenixWebsocket {
}
this.attemptReconnection()
}
this.disposeSocket()
this.disposeOnConnectionError()
this.onDisconnectedCallback?.()
}

Expand All @@ -127,16 +132,18 @@ export class PhoenixWebsocket {
this.reconnectionTimeout = undefined
}
}
this.disposeSocket()
this.disposeOnConnectionError()
this.onDisconnectedCallback?.()
}

private disposeSocket(): void {
private disposeOnConnectionError(): void {
if (this.heartbeatTimeout) {
clearTimeout(this.heartbeatTimeout)
this.heartbeatTimeout = undefined
}
this.replyQueue.clear()
for (let topic of this.topics.values()) {
topic.onConnectionError()
}
this.socket = undefined
}

Expand Down Expand Up @@ -175,9 +182,32 @@ export class PhoenixWebsocket {
: ({} as { [key: string]: any })
let response = new PhoenixMessage(topicId, messageId, topic, message, messageData)

if (response.messageId && response.message === 'phx_reply') {
this.replyQueue.get(response.messageId)?.(response.data as PhoenixReply)
this.replyQueue.delete(response.messageId)
if (response.topicId && response.messageId && response.message === 'phx_reply') {
if ((response.data as PhoenixReply)?.status === 'ok') {
if (this.heartbeatReplyQueue.has(response.messageId)) {
this.heartbeatReplyQueue.get(response.messageId)?.onReply(response.data as PhoenixOkReply)
this.heartbeatReplyQueue.delete(response.messageId)
} else {
this.topics
.get(response.topicId)
?.replyQueue.get(response.messageId)
?.onReply(response.data as PhoenixOkReply)
this.topics.get(response.topicId)?.replyQueue.delete(response.messageId)
}
} else {
if (this.heartbeatReplyQueue.has(response.messageId)) {
this.heartbeatReplyQueue
.get(response.messageId)
?.onError(new PhoenixRespondedWithError(response.data as PhoenixReply))
this.heartbeatReplyQueue.delete(response.messageId)
} else {
this.topics
.get(response.topicId)
?.replyQueue.get(response.messageId)
?.onError(new PhoenixRespondedWithError(response.data as PhoenixReply))
this.topics.get(response.topicId)?.replyQueue.delete(response.messageId)
}
}
} else if (response.topicId && response.message === 'phx_close') {
if (this.topics.has(response.topic ?? '')) {
this.topics.get(response.topic ?? '')!.status = TopicStatuses.Unsubscribed
Expand All @@ -195,6 +225,7 @@ export class PhoenixWebsocket {
}
let erroredTopic = this.topics.get(response.topic ?? '')!
erroredTopic.status = TopicStatuses.Unsubscribed
erroredTopic.onServerError()
this.topics.delete(response.topic ?? '')
this.subscribeToTopic(
erroredTopic.topic,
Expand Down Expand Up @@ -280,6 +311,8 @@ export class PhoenixWebsocket {
this._connectionStatus = WebsocketStatuses.Disconnecting
}

this.topics.forEach((t) => t.onConnectionClosed())

if (clearTopics) {
this.topics.clear()
}
Expand All @@ -299,6 +332,11 @@ export class PhoenixWebsocket {
* @param {string} topic - The topic to subscribe to.
* @param { { [key: string]: any } | undefined } payload - An optional payload object to be sent along with the join request.
* @param { [message: string]: TopicMessageHandler } messageHandlers - An optional object containing mappings of messages to message handler callbacks, which are called when the given message is received.
*
* @throws { PhoenixInternalServerError }
* @throws { PhoenixConnectionError }
* @throws { PhoenixDisconnectedError }
* @throws { PhoenixRespondedWithError }
*/
public subscribeToTopic(
topic: string,
Expand Down Expand Up @@ -373,6 +411,10 @@ export class PhoenixWebsocket {
*
* @throws { PhoenixInvalidTopicError }
* @throws { PhoenixInvalidStateError }
* @throws { PhoenixInternalServerError }
* @throws { PhoenixRespondedWithError }
* @throws { PhoenixDisconnectedError }
* @throws { PhoenixInternalServerError }
*/
public async sendMessage(
topic: string,
Expand Down Expand Up @@ -402,9 +444,12 @@ export class PhoenixWebsocket {
message,
payload
)
return await new Promise((resolve, _reject) => {
return await new Promise((resolve, reject) => {
this.socket?.send(phoenixMessage.toString())
this.replyQueue.set(phoenixMessage.messageId!, (reply) => resolve(reply))
phoenixTopic.replyQueue.set(phoenixMessage.messageId!, {
onReply: (reply) => resolve(reply),
onError: (err) => reject(err),
} as ReplyQueueEntry)
})
} else {
throw new PhoenixInvalidStateError()
Expand All @@ -425,20 +470,19 @@ export class PhoenixWebsocket {
topic.joinPayload
)
this.socket.send(message.toString())
this.replyQueue.set(message.messageId!, (reply) => {
if (reply.status === 'ok') {
topic.replyQueue.set(message.messageId!, {
onReply: (reply) => {
topic.status = TopicStatuses.Subscribed
topic.subscribedResolvers.forEach((r) => r())
topic.subscribedResolvers = []
} else if (reply.status === 'error') {
},
onError: (err) => {
topic.status = TopicStatuses.Unsubscribed
this.topics.delete(topic.topic)
topic.subscribedRejectors.forEach((r) => r(reply.response))
topic.subscribedRejectors.forEach((r) => r(err))
topic.subscribedRejectors = []
} else {
throw new Error('Phoenix Websocket encountered unexpected reply status: ' + reply.status)
}
})
},
} as ReplyQueueEntry)
topic.status = TopicStatuses.Joining
} else {
if (this.logLevel <= PhoenixWebsocketLogLevels.Errors) {
Expand Down Expand Up @@ -492,7 +536,10 @@ export class PhoenixWebsocket {
undefined
)
this.socket.send(newMessage.toString())
this.replyQueue.set(newMessage.messageId!, (_reply) => this.scheduleHeartbeat())
this.heartbeatReplyQueue.set(newMessage.messageId!, {
onReply: (_reply) => this.scheduleHeartbeat(),
onError: (_reply) => this.scheduleHeartbeat(),
} as ReplyQueueEntry)
this.heartbeatTimeout = undefined
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/types/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { PhoenixReply } from './reply'

/**
* Thrown when you try to interact with a topic that you are not currently subscribed to.
*/
Expand All @@ -23,3 +25,45 @@ export class PhoenixInvalidStateError extends Error {
)
}
}

/**
* Thrown when an unexpected network state or server error interrupts a topic connection.
*/
export class PhoenixConnectionError extends Error {
constructor(topic?: string) {
super(
topic
? `A connection error was encountered while trying to subscribe/interact with topic ${topic}.`
: `A connection error was encountered while trying to subscribe/interact with topic.`
)
}
}

/**
* Thrown when the server responds to a message with a server error that closes the topic.
*/
export class PhoenixInternalServerError extends Error {
constructor() {
super('The server encountered an internal error which forcibly closed the topic.')
}
}

/**
* Thrown when the server responds with an error reply.
* Differs from internal server error since this means the server successfully handled
* the request but explicitly returned an error.
*/
export class PhoenixRespondedWithError extends Error {
constructor(public reply?: PhoenixReply) {
super('The server responded with an error to the message.')
}
}

/**
* Thrown when attempting to interact with Phoenix after `disconnect()` has been called on the PhoenixWebsocket instance.
*/
export class PhoenixDisconnectedError extends Error {
constructor() {
super('Attempted to interact with Phoenix after the connection has been closed.')
}
}
16 changes: 16 additions & 0 deletions src/types/reply-queue-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { PhoenixOkReply, PhoenixReply } from './reply'
import {
PhoenixConnectionError,
PhoenixInternalServerError,
PhoenixRespondedWithError,
} from './errors'

/**
* A container object representing a promise waiting on a reply from the Phoenix server.
*/
export type ReplyQueueEntry = {
onReply: (reply: PhoenixOkReply) => void
onError: (
error: PhoenixConnectionError | PhoenixInternalServerError | PhoenixRespondedWithError
) => void
}
11 changes: 9 additions & 2 deletions src/types/reply.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
/**
* A container object representing a reply from the Phoenix server. Includes the status and an optional response payload.
*/
export type PhoenixReply = {
export type PhoenixReply = PhoenixOkReply | PhoenixErrorReply

export type PhoenixOkReply = {
response: { [key: string]: any } | string
status: 'ok' | 'error'
status: 'ok'
}

export type PhoenixErrorReply = {
response: { [key: string]: any } | string
status: 'error'
}
Loading

0 comments on commit bdf51f5

Please sign in to comment.