Skip to content

Commit

Permalink
duckmetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Feb 21, 2025
1 parent 5a13928 commit 1f4ef30
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 13 deletions.
10 changes: 8 additions & 2 deletions duck.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (o *Orchestrator[T]) configureDuck(d *duckConf) error {
if err != nil {
return fmt.Errorf("duckdb config error: %w", err)
}
o.Metrics.duckFiles.Add(1)
o.Metrics.duckMetrics.duckFiles.Add(1)
return nil
}

Expand All @@ -229,14 +229,18 @@ func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.Wait
for !(o.rChanClosed && o.rChanRecs.Load() == 0) {
if o.rChanRecs.Load() > 0 {
rwg.Add(1)
o.Metrics.duckMetrics.duckStart()
go o.DuckIngest(context.Background(), &rwg)
rwg.Wait()
if debugLog != nil {
debugLog("db size: %d\n", o.CurrentDBSize())
}
o.Metrics.recordBytes.Store(0)
// record cumulative size of duckdb files
o.Metrics.duckFilesSizeMB.Add(o.CurrentDBSize())
duckDBSize := o.CurrentDBSize()
o.Metrics.duckMetrics.duckFilesSizeMB.Add(duckDBSize)

// runner to run queries on db before close
if o.duckConf.runner != nil {
err := o.duckConf.runner.Run(ctx)
if err != nil {
Expand All @@ -248,10 +252,12 @@ func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.Wait
}
if o.duckConf.runner == nil || (o.duckConf.runner != nil && !o.duckConf.runner.IsDeleteDBOnDone()) {
o.duckConf.quack.Close()
// send duckPath to channel for external consumption
if o.opt.withDuckPathsChan {
o.duckPaths <- o.duckConf.quack.Path()
}
}
o.Metrics.duckMetrics.duckStop(duckDBSize)
if !(o.rChanClosed && o.rChanRecs.Load() == 0) {
o.configureDuck(o.duckConf)
}
Expand Down
55 changes: 44 additions & 11 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

json "github.com/goccy/go-json"
"github.com/spf13/cast"
)

type Metrics struct {
Expand All @@ -21,8 +22,7 @@ type Metrics struct {
normRecordsInserted atomic.Int64
totalBytes atomic.Int64
recordBytes atomic.Int64
duckFiles atomic.Int64
duckFilesSizeMB atomic.Int64
duckMetrics duckMetrics

maxMChanLen atomic.Int32
maxRChanLen atomic.Int32
Expand All @@ -41,11 +41,38 @@ type Metrics struct {
endTimeUnix atomic.Int64
}

type duckMetrics struct {
duckFileStart time.Time
duckFileWriteDurationNanosAvg time.Duration
duckFileDurations []time.Duration
duckFiles atomic.Int64
duckFilesSizeMB atomic.Int64
duckFilesSizesMB []int64
}

func (d *duckMetrics) duckStart() { d.duckFileStart = time.Now() }
func (d *duckMetrics) duckStop(dbSizeMB int64) {
duration := time.Since(d.duckFileStart)
if len(d.duckFileDurations) >= 100 {
d.duckFileDurations = append(d.duckFileDurations[24:], duration)
d.duckFilesSizesMB = append(d.duckFilesSizesMB[24:], dbSizeMB)
} else {
d.duckFileDurations = append(d.duckFileDurations, duration)
d.duckFilesSizesMB = append(d.duckFilesSizesMB, dbSizeMB)
}

var sumDurations int64
for _, d := range d.duckFileDurations {
sumDurations = sumDurations + d.Nanoseconds()
}
d.duckFileWriteDurationNanosAvg = cast.ToDuration(sumDurations / int64(len(d.duckFileDurations)))
}

func (o *Orchestrator[T]) NewMetrics() {
o.Metrics = new(Metrics)
o.Metrics.numCPU.Store(int32(runtime.NumCPU()))
o.Metrics.runtimeOS = runtime.GOOS
o.Metrics.duckFiles.Store(1)
o.Metrics.duckMetrics.duckFiles.Store(1)
}

func (o *Orchestrator[T]) StartMetrics() {
Expand All @@ -65,8 +92,8 @@ func (o *Orchestrator[T]) ResetMetrics() {
o.Metrics.normRecordsInserted.Store(0)
o.Metrics.totalBytes.Store(0)
o.Metrics.recordBytes.Store(0)
o.Metrics.duckFiles.Store(0)
o.Metrics.duckFilesSizeMB.Store(0)
o.Metrics.duckMetrics.duckFiles.Store(0)
o.Metrics.duckMetrics.duckFilesSizeMB.Store(0)
o.Metrics.startTime = time.Now()
}

Expand Down Expand Up @@ -166,8 +193,8 @@ func (o *Orchestrator[T]) generateBenchmarksReport() MetricsReport {
throughput := float64(o.Metrics.throughput.Load()) / 100
totalThroughput := float64(o.Metrics.totalThroughput.Load()) / 100
throughputBytes := o.Metrics.throughputBytes.Load()
duckFiles := o.Metrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckFilesSizeMB.Load()
duckFiles := o.Metrics.duckMetrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckMetrics.duckFilesSizeMB.Load()
return MetricsReport{
NumCPU: int(o.Metrics.numCPU.Load()),
RuntimeOS: o.Metrics.runtimeOS,
Expand All @@ -185,6 +212,7 @@ func (o *Orchestrator[T]) generateBenchmarksReport() MetricsReport {
TransferRate: formatThroughputBytes(float64(throughputBytes)),
OutputFiles: duckFiles,
OutputFilesMB: duckFilesSize,
AvgDurationPerFile: formatDuration(o.Metrics.duckMetrics.duckFileWriteDurationNanosAvg),
}
}

Expand Down Expand Up @@ -233,6 +261,7 @@ type MetricsReport struct {
TransferRate string `json:"transfer_rate"`
OutputFiles int64 `json:"duckdb_files"`
OutputFilesMB int64 `json:"duckdb_files_MB"`
AvgDurationPerFile string `json:"file_avg_duration,omitempty"`
}

type TypedMetricsReport struct {
Expand All @@ -256,6 +285,7 @@ type TypedMetricsReport struct {
TransferRate string `json:"transfer_rate"`
OutputFiles int64 `json:"duckdb_files"`
OutputFilesMB int64 `json:"duckdb_files_MB"`
AvgDurationPerFile float64 `json:"file_avg_duration,omitzero"`
}

func (o *Orchestrator[T]) generateMetricsReport() MetricsReport {
Expand All @@ -275,8 +305,9 @@ func (o *Orchestrator[T]) generateMetricsReport() MetricsReport {
customArrowCount := len(o.opt.customArrow)
customArrows = &customArrowCount
}
duckFiles := o.Metrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckFilesSizeMB.Load()
duckFiles := o.Metrics.duckMetrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckMetrics.duckFilesSizeMB.Load()

return MetricsReport{
NumCPU: int(o.Metrics.numCPU.Load()),
RuntimeOS: o.Metrics.runtimeOS,
Expand All @@ -298,6 +329,7 @@ func (o *Orchestrator[T]) generateMetricsReport() MetricsReport {
TransferRate: formatThroughputBytes(float64(throughputBytes)),
OutputFiles: duckFiles,
OutputFilesMB: duckFilesSize,
AvgDurationPerFile: formatDuration(o.Metrics.duckMetrics.duckFileWriteDurationNanosAvg),
}
}

Expand All @@ -318,8 +350,8 @@ func (o *Orchestrator[T]) generateUnformatedMetricsReport() TypedMetricsReport {
customArrowCount := len(o.opt.customArrow)
customArrows = &customArrowCount
}
duckFiles := o.Metrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckFilesSizeMB.Load()
duckFiles := o.Metrics.duckMetrics.duckFiles.Load()
duckFilesSize := o.Metrics.duckMetrics.duckFilesSizeMB.Load()
return TypedMetricsReport{
NumCPU: int(o.Metrics.numCPU.Load()),
RuntimeOS: o.Metrics.runtimeOS,
Expand All @@ -341,6 +373,7 @@ func (o *Orchestrator[T]) generateUnformatedMetricsReport() TypedMetricsReport {
TransferRate: formatThroughputBytes(float64(throughputBytes)),
OutputFiles: duckFiles,
OutputFilesMB: duckFilesSize,
AvgDurationPerFile: o.Metrics.duckMetrics.duckFileWriteDurationNanosAvg.Seconds(),
}
}

Expand Down

0 comments on commit 1f4ef30

Please sign in to comment.