Skip to content

Commit

Permalink
fixup! feat(shed): actor state diff stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 7, 2025
1 parent 46d3d03 commit 33faf67
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 59 deletions.
83 changes: 59 additions & 24 deletions cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/printer"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -50,14 +53,19 @@ type actorStats struct {
Actor *types.Actor
Fields []fieldItem
Stats api.ObjStat
Blocks []cid.Cid `json:",omitempty"`
Blocks []blockRepr `json:",omitempty"`
}

type fieldItem struct {
Name string
Cid cid.Cid
Stats api.ObjStat
Blocks []cid.Cid `json:",omitempty"`
Blocks []blockRepr `json:",omitempty"`
}

type blockRepr struct {
Cid cid.Cid
Representation string `json:",omitempty"`
}

type job struct {
Expand Down Expand Up @@ -126,13 +134,14 @@ func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan
}

type dagStatCollector struct {
ds format.NodeGetter
walk func(format.Node) ([]*format.Link, error)
collectCids bool
ds format.NodeGetter
walk func(format.Node) ([]*format.Link, error)
collectCids bool
representBlocks bool

statsLk sync.Mutex
stats api.ObjStat
cids []cid.Cid
blocks []blockRepr
}

func (dsc *dagStatCollector) record(c cid.Cid, nd format.Node) error {
Expand All @@ -147,7 +156,15 @@ func (dsc *dagStatCollector) record(c cid.Cid, nd format.Node) error {
dsc.stats.Size = dsc.stats.Size + size
dsc.stats.Links = dsc.stats.Links + 1
if dsc.collectCids {
dsc.cids = append(dsc.cids, c)
br := blockRepr{Cid: c}
if dsc.representBlocks {
node, err := ipld.Decode(nd.RawData(), dagcbor.Decode)
if err != nil {
return xerrors.Errorf("decoding node: %w", err)
}
br.Representation = printer.Config{OmitScalarValues: true}.Sprint(node)
}
dsc.blocks = append(dsc.blocks, br)
}

return nil
Expand Down Expand Up @@ -645,6 +662,10 @@ the total state of the actor in either tipset.
Name: "list-blocks",
Usage: "list the CIDs of blocks in the stat set being processed, in the case of a diff-tipset this will be the blocks that are different between the two tipsets",
},
&cli.BoolFlag{
Name: "print-blocks",
Usage: "provide a human-readable representation of each of the blocks in the stat set being processed, implies --list-blocks",
},
&cli.IntFlag{
Name: "workers",
Usage: "number of workers to use when processing",
Expand Down Expand Up @@ -708,12 +729,16 @@ the total state of the actor in either tipset.

numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size")
listBlocks := cctx.Bool("list-blocks") && !cctx.IsSet("diff-tipset") // if diff, don't list on first pass
printBlocks := cctx.Bool("print-blocks")
listBlocks := (printBlocks || cctx.Bool("list-blocks")) && !cctx.IsSet("diff-tipset") // if diff, don't list on first pass

jobs := make(chan address.Address, numWorkers)
results := make(chan actorStats, numWorkers)

sc := &statCollector{}
sc := &statCollector{
collectCids: listBlocks,
representBlocks: printBlocks,
}

worker := func(ctx context.Context, id int, ts *types.TipSet) error {
completed := 0
Expand Down Expand Up @@ -742,7 +767,7 @@ the total state of the actor in either tipset.
}
}

actStats, err := sc.collectStats(ctx, addr, actor, dag, listBlocks)
actStats, err := sc.collectStats(ctx, addr, actor, dag)
if err != nil {
return err
}
Expand Down Expand Up @@ -805,7 +830,8 @@ the total state of the actor in either tipset.

jobs = make(chan address.Address, numWorkers)
results = make(chan actorStats, numWorkers)
listBlocks = cctx.Bool("list-blocks")
listBlocks = cctx.Bool("list-blocks") || printBlocks
sc.collectCids = listBlocks

eg, egctx = errgroup.WithContext(ctx)
for w := 0; w < numWorkers; w++ {
Expand Down Expand Up @@ -938,11 +964,18 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter,
}

type statCollector struct {
rootCidSet *cid.Set
fieldCidSets map[string]*cid.Set
rootCidSet *cid.Set
collectCids bool
representBlocks bool
fieldCidSets map[string]*cid.Set
}

func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter, collectCids bool) (actorStats, error) {
func (sc *statCollector) collectStats(
ctx context.Context,
addr address.Address,
actor *types.Actor,
dag format.NodeGetter,
) (actorStats, error) {
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))

nd, err := dag.Get(ctx, actor.Head)
Expand Down Expand Up @@ -993,25 +1026,27 @@ func (sc *statCollector) collectStats(ctx context.Context, addr address.Address,
}

dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
collectCids: collectCids,
ds: dag,
walk: carWalkFunc,
collectCids: sc.collectCids,
representBlocks: sc.representBlocks,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

actStats.Stats = dsc.stats
if collectCids {
actStats.Blocks = dsc.cids
if sc.collectCids {
actStats.Blocks = dsc.blocks
}

for _, field := range fields {
dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
collectCids: collectCids,
ds: dag,
walk: carWalkFunc,
collectCids: sc.collectCids,
representBlocks: sc.representBlocks,
}

if err := merkledag.Walk(ctx, func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
Expand All @@ -1022,8 +1057,8 @@ func (sc *statCollector) collectStats(ctx context.Context, addr address.Address,
}

field.Stats = dsc.stats
if collectCids {
field.Blocks = dsc.cids
if sc.collectCids {
field.Blocks = dsc.blocks
}

actStats.Fields = append(actStats.Fields, field)
Expand Down
19 changes: 8 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
github.com/ipfs/go-metrics-prometheus v0.0.2
github.com/ipld/go-car v0.6.2
github.com/ipld/go-car/v2 v2.13.1
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime v0.21.1-0.20250107052906-99e47bae5856
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
Expand All @@ -126,7 +126,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/polydawn/refmt v0.89.0
github.com/polydawn/refmt v0.89.1-0.20231129105047-37766d95467a
github.com/prometheus/client_golang v1.20.5
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/raulk/clock v1.1.0
Expand Down Expand Up @@ -245,7 +245,6 @@ require (
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand All @@ -264,7 +263,7 @@ require (
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/lucasb-eyer/go-colorful v1.0.3 // indirect
github.com/magefile/mage v1.9.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
Expand Down Expand Up @@ -315,15 +314,13 @@ require (
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.48.2 // indirect
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect; dependency-check-ignore: unknown
github.com/rivo/uniseg v0.4.7 // indirect
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.0.1 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
Expand All @@ -337,12 +334,12 @@ require (
github.com/zondax/ledger-go v0.14.3 // indirect
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect
go.dedis.ch/kyber/v4 v4.0.0-pre2.0.20240924132404-4de33740016e // indirect; dependency-check-ignore: unknown
go.dedis.ch/kyber/v4 v4.0.0-pre2.0.20240924132404-4de33740016e // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/mock v0.5.0 // indirect
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/text v0.21.0 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
Expand All @@ -352,7 +349,7 @@ require (
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect; dependency-check-ignore: required by github.com/elastic/go-sysinfo
howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect
lukechampine.com/blake3 v1.3.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
Loading

0 comments on commit 33faf67

Please sign in to comment.