Skip to content

Commit

Permalink
implement all proxy methods
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecsl committed Dec 11, 2023
1 parent 65be09d commit 232e91a
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 101 deletions.
14 changes: 14 additions & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,19 @@ module.exports = {
extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended"],
parser: "@typescript-eslint/parser",
plugins: ["@typescript-eslint"],
rules: {
"@typescript-eslint/switch-exhaustiveness-check": "error",
"@typescript-eslint/no-explicit-any": "warn",
"@typescript-eslint/no-unused-vars": "warn"
},
ignorePatterns: ["src/test/**/*"],
overrides: [
{
files: ['*.ts', '*.tsx'],
parserOptions: {
project: ['./tsconfig.json'], // Specify it only for TypeScript files
},
},
],
root: true,
};
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
"@types/jest": "^29.4.0",
"@types/node-fetch": "^2.6.4",
"@types/ws": "^8.5.5",
"@typescript-eslint/eslint-plugin": "^5.59.9",
"@typescript-eslint/parser": "^5.59.9",
"@typescript-eslint/eslint-plugin": "^6.13.2",
"@typescript-eslint/parser": "^6.13.2",
"dotenv": "^16.1.4",
"esbuild": "^0.18.0",
"eslint": "^8.42.0",
Expand Down
141 changes: 94 additions & 47 deletions src/client/wsJsonClientProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,42 @@ import WebSocket from "isomorphic-ws";
import MulticastIterator from "obgen/multicastIterator";
import BufferedIterator from "obgen/bufferedIterator";
import { deferredWrap } from "obgen";
import { throwError } from "./util";
import debug from "debug";
import { ChannelState } from "./realWsJsonClient";
import { isString } from "lodash";
import Observable from "obgen/observable";

const logger = debug("wsClientProxy");

export const ALL_REQUESTS = [
"authenticate",
"optionChainQuotes",
"disconnect",
"quotes",
"accountPositions",
"chart",
"searchInstruments",
"lookupAlerts",
"optionChain",
"optionChainQuotes",
"optionChainDetails",
"optionQuotes",
"placeOrder",
"replaceOrder",
"workingOrders",
"createAlert",
"cancelAlert",
"cancelOrder",
"userProperties",
"marketDepth",
"watchlist",
] as const;
type RequestType = typeof ALL_REQUESTS;
type Request = RequestType[number];
export type Request = RequestType[number];

export type ProxiedRequest = {
request: Request;
args?: any[];
args?: any;
};

export type ProxiedResponse = ProxiedRequest & { response: unknown };
Expand All @@ -70,10 +88,6 @@ export default class WsJsonClientProxy implements WsJsonClient {
private readonly options?: any
) {}

accountPositions(_: string): AsyncIterable<PositionsResponse> {
throwError("not implemented");
}

async authenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
Expand Down Expand Up @@ -114,27 +128,33 @@ export default class WsJsonClientProxy implements WsJsonClient {
private doAuthenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.sendMessage({ request: "authenticate", args: [accessToken] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "authenticate")
.map(({ response }) => response)
.promise() as Promise<RawLoginResponseBody | null>;
return this.dispatch<RawLoginResponseBody | null>(
"authenticate",
accessToken
).promise();
}

cancelAlert(_: number): Promise<CancelAlertResponse> {
throwError("not implemented");
accountPositions(accountNumber: string): AsyncIterable<PositionsResponse> {
return this.dispatch<PositionsResponse>(
"accountPositions",
accountNumber
).iterable();
}

cancelOrder(_: number): Promise<CancelOrderResponse> {
throwError("not implemented");
cancelAlert(alertId: number): Promise<CancelAlertResponse> {
return this.dispatch<CancelAlertResponse>("cancelAlert", alertId).promise();
}

chart(_: ChartRequestParams): AsyncIterable<ChartResponse> {
throwError("not implemented");
cancelOrder(orderId: number): Promise<CancelOrderResponse> {
return this.dispatch<CancelOrderResponse>("cancelOrder", orderId).promise();
}

createAlert(_: CreateAlertRequestParams): Promise<CreateAlertResponse> {
throwError("not implemented");
chart(request: ChartRequestParams): AsyncIterable<ChartResponse> {
return this.dispatch<ChartResponse>("chart", request).iterable();
}

createAlert(request: CreateAlertRequestParams): Promise<CreateAlertResponse> {
return this.dispatch<CreateAlertResponse>("createAlert", request).promise();
}

disconnect(): void {
Expand All @@ -156,71 +176,98 @@ export default class WsJsonClientProxy implements WsJsonClient {
}

lookupAlerts(): AsyncIterable<LookupAlertsResponse> {
throwError("not implemented");
return this.dispatch<LookupAlertsResponse>("lookupAlerts").iterable();
}

marketDepth(_: string): AsyncIterable<MarketDepthResponse> {
throwError("not implemented");
marketDepth(symbol: string): AsyncIterable<MarketDepthResponse> {
return this.dispatch<MarketDepthResponse>("marketDepth", symbol).iterable();
}

optionChain(_: string): Promise<OptionChainResponse> {
throwError("not implemented");
optionChain(symbol: string): Promise<OptionChainResponse> {
return this.dispatch<OptionChainResponse>("optionChain", symbol).promise();
}

optionChainDetails(
_: OptionChainDetailsRequest
request: OptionChainDetailsRequest
): Promise<OptionChainDetailsResponse> {
throwError("not implemented");
return this.dispatch<OptionChainDetailsResponse>(
"optionChainDetails",
request
).promise();
}

optionChainQuotes(symbol: string): AsyncIterable<OptionSeriesQuotesResponse> {
this.sendMessage({ request: "optionChainQuotes", args: [symbol] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "optionChainQuotes")
.map(({ response }) => response)
.iterable() as AsyncIterable<OptionSeriesQuotesResponse>;
return this.dispatch<OptionSeriesQuotesResponse>(
"optionChainQuotes",
symbol
).iterable();
}

optionQuotes(
_: OptionQuotesRequestParams
request: OptionQuotesRequestParams
): AsyncIterable<OptionQuotesResponse> {
throwError("not implemented");
return this.dispatch<OptionQuotesResponse>(
"optionQuotes",
request
).iterable();
}

placeOrder(
_: PlaceLimitOrderRequestParams
request: PlaceLimitOrderRequestParams
): Promise<PlaceOrderSnapshotResponse> {
throwError("not implemented");
return this.dispatch<PlaceOrderSnapshotResponse>(
"placeOrder",
request
).promise();
}

quotes(_: string[]): AsyncIterable<QuotesResponse> {
throwError("not implemented");
quotes(symbols: string[]): AsyncIterable<QuotesResponse> {
return this.dispatch<QuotesResponse>("quotes", symbols).iterable();
}

replaceOrder(
_: Required<PlaceLimitOrderRequestParams>
request: Required<PlaceLimitOrderRequestParams>
): Promise<OrderEventsResponse> {
throwError("not implemented");
return this.dispatch<OrderEventsResponse>(
"replaceOrder",
request
).promise();
}

searchInstruments(_: string): Promise<InstrumentSearchResponse> {
throwError("not implemented");
searchInstruments(query: string): Promise<InstrumentSearchResponse> {
return this.dispatch<InstrumentSearchResponse>(
"searchInstruments",
query
).promise();
}

userProperties(): Promise<UserPropertiesResponse> {
throwError("not implemented");
return this.dispatch<UserPropertiesResponse>("userProperties").promise();
}

watchlist(_: number): Promise<GetWatchlistResponse> {
throwError("not implemented");
watchlist(watchlistId: number): Promise<GetWatchlistResponse> {
return this.dispatch<GetWatchlistResponse>(
"watchlist",
watchlistId
).promise();
}

workingOrders(_: string): AsyncIterable<OrderEventsResponse> {
throwError("not implemented");
workingOrders(accountNumber: string): AsyncIterable<OrderEventsResponse> {
return this.dispatch<OrderEventsResponse>(
"workingOrders",
accountNumber
).iterable();
}

private sendMessage(request: ProxiedRequest) {
this.ensureConnected();
this.socket!.send(JSON.stringify(request));
}

private dispatch<T>(req: Request, args?: any): Observable<T> {
this.sendMessage({ request: req, args });
return deferredWrap<ProxiedResponse>(() => this.iterator)
.filter(({ request }) => request === req)
.map(({ response }) => response as T);
}
}
6 changes: 5 additions & 1 deletion src/example/testApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ async function run() {
);
const { client } = await authClient.authenticateWithRetry(token);
const app = new TestApp(client);
await app.optionChainQuotes("ABNB");
await Promise.all([
app.quotes(["ABNB", "UBER"]),
app.accountPositions(),
app.optionChain("TSLA"),
]);
}

run().catch(console.error);
64 changes: 50 additions & 14 deletions src/server/wsJsonServerProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,64 @@ export default class WsJsonServerProxy implements Disposable {
private onClientMessage = async (data: string) => {
const msg = JSON.parse(data) as ProxiedRequest;
logger("⬅️\treceived %O", msg);
const { request, args } = msg;
const { request } = msg;
switch (request) {
case "authenticate": {
this.upstream = this.wsJsonClientFactory();
try {
const authResult = await this.upstream.authenticate(args![0]);
this.proxyResponse({ ...msg, response: authResult });
} catch (e) {
this.proxyResponse({ ...msg, response: e });
}
break;
}
case "optionChainQuotes": {
for await (const quote of this.upstream!.optionChainQuotes(args![0])) {
this.proxyResponse({ ...msg, response: quote });
}
break;
return await this.relayPromise(msg);
}
case "quotes":
case "accountPositions":
case "chart":
case "lookupAlerts":
case "optionChainQuotes":
case "optionQuotes":
case "workingOrders":
case "marketDepth":
return await this.relayIterable(msg);
case "createAlert":
case "cancelAlert":
case "cancelOrder":
case "searchInstruments":
case "userProperties":
case "watchlist":
case "optionChain":
case "optionChainDetails":
case "replaceOrder":
case "placeOrder":
return await this.relayPromise(msg);
case "disconnect":
return this.upstream!.disconnect();
}
};

private proxyResponse(msg: ProxiedResponse) {
this.downstream.send(JSON.stringify(msg));
}

private async relayPromise({ request, args }: ProxiedRequest) {
const upstream = this.ensureConnected();
try {
const response = await upstream[request](args as never);
this.proxyResponse({ request, args, response });
} catch (e) {
this.proxyResponse({ request, args, response: e });
}
}

private async relayIterable({ request, args }: ProxiedRequest) {
const upstream = this.ensureConnected();
for await (const response of upstream[request](
args as never
) as AsyncIterable<any>) {
this.proxyResponse({ request, args, response });
}
}

private ensureConnected(): WsJsonClient {
if (!this.upstream) {
throw new Error("Not connected");
}
return this.upstream!;
}
}
Loading

0 comments on commit 232e91a

Please sign in to comment.