-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocessor_node.go
46 lines (37 loc) · 1.17 KB
/
processor_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package kstreams
import (
"context"
"errors"
)
// InputProcessor is a partial interface covering only the generic input K/V,
// without requiring the caller to know the generic types of the output.
type InputProcessor[K any, V any] interface {
Process(context.Context, K, V) error
}
var _ = InputProcessor[any, any](&ProcessorNode[any, any, any, any]{})
type ProcessorNode[Kin any, Vin any, Kout any, Vout any] struct {
userProcessor Processor[Kin, Vin, Kout, Vout]
processorContext *InternalProcessorContext[Kout, Vout]
}
func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Process(ctx context.Context, k Kin, v Vin) error {
err := p.userProcessor.Process(ctx, k, v)
if err != nil {
return err
}
// FIXME this does not work. every node writes to the ctx and it's more or less random which node gets which error...
errz := p.processorContext.drainErrors()
if len(errz) > 0 {
var errs error
for _, err := range errz {
errs = errors.Join(errs, err)
}
return errs
}
return nil
}
func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Init() error {
return p.userProcessor.Init(p.processorContext)
}
func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Close() error {
return nil
}