diff --git a/.changeset/twenty-rabbits-double.md b/.changeset/twenty-rabbits-double.md new file mode 100644 index 000000000..83ad29dd0 --- /dev/null +++ b/.changeset/twenty-rabbits-double.md @@ -0,0 +1,5 @@ +--- +"@typed/fx": patch +--- + +Improve Subject error handling diff --git a/packages/fx/src/Subject.ts b/packages/fx/src/Subject.ts index c0be4c257..7148c1954 100644 --- a/packages/fx/src/Subject.ts +++ b/packages/fx/src/Subject.ts @@ -78,8 +78,7 @@ const DISCARD = { discard: true } as const * @internal */ export class SubjectImpl extends FxBase implements Subject { - protected sinks: Set, Context.Context]> = new Set() - protected scopes: Set = new Set() + protected sinks: Set, Context.Context, Scope.CloseableScope]> = new Set() constructor() { super() @@ -101,16 +100,17 @@ export class SubjectImpl extends FxBase implements Subj return this.onEvent(a) } - readonly interrupt = Effect.fiberIdWith((id) => + protected interruptScopes = Effect.fiberIdWith((id) => Effect.tap( - Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD), + Effect.forEach(this.sinks, ([, , scope]) => Scope.close(scope, Exit.interrupt(id)), DISCARD), () => { this.sinks.clear() - this.scopes.clear() } ) ) + readonly interrupt = this.interruptScopes + protected addSink( sink: Sink, f: (scope: Scope.Scope) => Effect.Effect @@ -118,19 +118,13 @@ export class SubjectImpl extends FxBase implements Subj return withScope( (innerScope) => Effect.contextWithEffect((ctx) => { - const entry = [sink, ctx] as const - const add = Effect.sync(() => { - this.sinks.add(entry) - this.scopes.add(innerScope) - }) - const remove = Effect.sync(() => { - this.sinks.delete(entry) - this.scopes.delete(innerScope) - }) + const entry = [sink, ctx, innerScope] as const + this.sinks.add(entry) + const remove = Effect.sync(() => this.sinks.delete(entry)) return Effect.zipRight( Scope.addFinalizer(innerScope, remove), - Effect.zipRight(add, f(innerScope)) + f(innerScope) ) }), ExecutionStrategy.sequential @@ -141,20 +135,41 @@ export class SubjectImpl extends FxBase implements Subj protected onEvent(a: A) { if (this.sinks.size === 0) return Effect.unit - else { - return Effect.forEach(this.sinks, ([sink, ctx]) => Effect.provide(sink.onSuccess(a), ctx), DISCARD) + else if (this.sinks.size === 1) { + const [sink, ctx] = this.sinks.values().next().value + return runSinkEvent(sink, ctx, a) + } else { + return Effect.forEach( + this.sinks, + ([sink, ctx]) => runSinkEvent(sink, ctx, a), + DISCARD + ) } } protected onCause(cause: Cause.Cause) { - return Effect.forEach( - this.sinks, - ([sink, ctx]) => Effect.provide(sink.onFailure(cause), ctx), - DISCARD - ) + if (this.sinks.size === 0) return Effect.unit + else if (this.sinks.size === 1) { + const [sink, ctx] = this.sinks.values().next().value + return runSinkCause(sink, ctx, cause) + } else { + return Effect.forEach( + this.sinks, + ([sink, ctx]) => runSinkCause(sink, ctx, cause), + DISCARD + ) + } } } +function runSinkEvent(sink: Sink, ctx: Context.Context, a: A) { + return Effect.provide(Effect.catchAllCause(sink.onSuccess(a), sink.onFailure), ctx) +} + +function runSinkCause(sink: Sink, ctx: Context.Context, cause: Cause.Cause) { + return Effect.provide(Effect.catchAllCause(sink.onFailure(cause), () => Effect.unit), ctx) +} + /** * @internal */ @@ -185,11 +200,9 @@ export class HoldSubjectImpl extends SubjectImpl implements Subject< })) } - readonly interrupt = Effect.fiberIdWith((id) => - Effect.tap( - Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD), - () => MutableRef.set(this.lastValue, Option.none()) - ) + readonly interrupt = Effect.tap( + this.interruptScopes, + () => MutableRef.set(this.lastValue, Option.none()) ) } @@ -216,11 +229,9 @@ export class ReplaySubjectImpl extends SubjectImpl { ) } - readonly interrupt = Effect.fiberIdWith((id) => - Effect.tap( - Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD), - () => this.buffer.clear() - ) + readonly interrupt = Effect.tap( + this.interruptScopes, + () => this.buffer.clear() ) }