Skip to content

Commit

Permalink
optional duckpaths channel
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Feb 19, 2025
1 parent 2dc1584 commit 5a13928
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
4 changes: 3 additions & 1 deletion duck.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.Wait
}
if o.duckConf.runner == nil || (o.duckConf.runner != nil && !o.duckConf.runner.IsDeleteDBOnDone()) {
o.duckConf.quack.Close()
o.duckPaths <- o.duckConf.quack.Path()
if o.opt.withDuckPathsChan {
o.duckPaths <- o.duckConf.quack.Path()
}
}
if !(o.rChanClosed && o.rChanRecs.Load() == 0) {
o.configureDuck(o.duckConf)
Expand Down
9 changes: 8 additions & 1 deletion quacfka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Opt struct {
withoutProc bool
withoutDuck bool
withoutDuckIngestRaw bool
withDuckPathsChan bool
fileRotateThresholdMB int64
customArrow []CustomArrow
normalizerFieldStrings []string
Expand Down Expand Up @@ -57,6 +58,12 @@ func WithoutDuck() Option {
}
}

func WithDuckPathsChan() Option {
return func(cfg config) {
cfg.withDuckPathsChan = true
}
}

func WithCustomArrows(p []CustomArrow) Option {
return func(cfg config) {
for _, c := range p {
Expand Down Expand Up @@ -151,7 +158,7 @@ func NewOrchestrator[T proto.Message](opts ...Option) (*Orchestrator[T], error)
o.rowGroupSizeMultiplier = 1
o.msgProcessorsCount.Store(1)
o.duckConnCount.Store(1)
o.duckPaths = make(chan string, 10)
o.duckPaths = make(chan string, 10000)
o.NewMetrics()
return o, nil
}
Expand Down

0 comments on commit 5a13928

Please sign in to comment.