Skip to content

Commit

Permalink
DAG reprocessing to restore lachesis roots
Browse files Browse the repository at this point in the history
  • Loading branch information
rus-alex committed Aug 14, 2023
1 parent 2f009c3 commit 5af2739
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 16 deletions.
12 changes: 9 additions & 3 deletions cmd/opera/launcher/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"gopkg.in/urfave/cli.v1"

"github.com/Fantom-foundation/go-opera/gossip"
"github.com/Fantom-foundation/go-opera/integration"
"github.com/Fantom-foundation/go-opera/utils/dag"
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
)
Expand Down Expand Up @@ -84,7 +85,7 @@ func exportEvents(ctx *cli.Context) error {
switch {
// DOT format:
case strings.HasSuffix(fn, ".dot"):
err = exportDOT(writer, gdb, from, to)
err = exportDOT(writer, gdb, cfg, from, to)
if err != nil {
utils.Fatalf("Export DOT error: %v\n", err)
}
Expand Down Expand Up @@ -134,8 +135,13 @@ func exportRLP(w io.Writer, gdb *gossip.Store, from, to idx.Epoch) (err error) {
}

// exportDOT writer the active chain.
func exportDOT(writer io.Writer, gdb *gossip.Store, from, to idx.Epoch) (err error) {
graph := dag.Graph(gdb, from, to)
func exportDOT(writer io.Writer, gdb *gossip.Store, cfg *config, from, to idx.Epoch) (err error) {
consensusCfg := integration.Configs{
Lachesis: cfg.Lachesis,
VectorClock: cfg.VectorClock,
}

graph := dag.Graph(gdb, consensusCfg, from, to)
buf, err := dot.Marshal(graph, "DAG", "", "\t")
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions utils/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"gonum.org/v1/gonum/graph/encoding/dot"

"github.com/Fantom-foundation/go-opera/gossip"
"github.com/Fantom-foundation/go-opera/integration"
)

func Graph(db *gossip.Store, from, to idx.Epoch) dot.Graph {
func Graph(db *gossip.Store, cfg integration.Configs, from, to idx.Epoch) dot.Graph {
/* g:= &dagReader{
db: db,
epochFrom: from,
epochTo: to,
}*/

g := newDagLoader(db, from, to)
g := newDagLoader(db, cfg, from, to)

return g
}
125 changes: 114 additions & 11 deletions utils/dag/loader.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,139 @@
package dag

import (
"fmt"

"github.com/Fantom-foundation/lachesis-base/abft"
"github.com/Fantom-foundation/lachesis-base/gossip/dagordering"
"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/dag"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/Fantom-foundation/lachesis-base/inter/pos"
"github.com/ethereum/go-ethereum/log"
"gonum.org/v1/gonum/graph"

"github.com/Fantom-foundation/go-opera/gossip"
"github.com/Fantom-foundation/go-opera/integration"
"github.com/Fantom-foundation/go-opera/inter"
"github.com/Fantom-foundation/go-opera/utils/adapters/vecmt2dagidx"
"github.com/Fantom-foundation/go-opera/vecmt"
)

type dagLoader struct {
refs []hash.Event
nodes map[hash.Event]*dagNode
}

func newDagLoader(db *gossip.Store, from, to idx.Epoch) *dagLoader {
func newDagLoader(gdb *gossip.Store, cfg integration.Configs, from, to idx.Epoch) *dagLoader {
g := &dagLoader{
refs: make([]hash.Event, 0, 2000000),
nodes: make(map[hash.Event]*dagNode),
}

db.ForEachEvent(from, func(e *inter.EventPayload) bool {
store := abft.NewMemStore()
defer store.Close()
// ApplyGenesis()
store.SetEpochState(&abft.EpochState{
Epoch: from,
})
store.SetLastDecidedState(&abft.LastDecidedState{
LastDecidedFrame: abft.FirstFrame - 1,
})

dagIndexer := vecmt.NewIndex(panics("Vector clock"), cfg.VectorClock)
orderer := abft.NewOrderer(
store,
&integration.GossipStoreAdapter{gdb},
vecmt2dagidx.Wrap(dagIndexer),
panics("Lachesis"),
cfg.Lachesis)

var (
epoch idx.Epoch
vv = make(pos.ValidatorsBuilder, 60)
ee = make(map[hash.Event]dag.Event, 1000)
)
err := orderer.Bootstrap(abft.OrdererCallbacks{
ApplyAtropos: func(decidedFrame idx.Frame, atropos hash.Event) (sealEpoch *pos.Validators) {
return nil
},
})
if err != nil {
panic(err)
}
buffer := dagordering.New(
cfg.Opera.Protocol.DagProcessor.EventsBufferLimit,
dagordering.Callback{
Process: func(e dag.Event) error {
fmt.Printf("<<< E(%d - %d) %s \n", e.Epoch(), e.Frame(), e.ID().String())
err = dagIndexer.Add(e)
if err != nil {
panic(err)
}
dagIndexer.Flush()
orderer.Process(e)

id := len(g.refs)
g.refs = append(g.refs, e.ID())
g.nodes[e.ID()] = &dagNode{
id: int64(id),
hash: e.ID(),
parents: e.Parents(),
}
return nil
},
Released: func(e dag.Event, peer string, err error) {
// panic(err)
},
Get: func(id hash.Event) dag.Event {
return ee[id]
},
Exists: func(id hash.Event) bool {
_, ok := ee[id]
return ok
},
})

gdb.ForEachEvent(from, func(e *inter.EventPayload) bool {
// current epoch is finished
// , so process accumulated events
if epoch < e.Epoch() {
epoch = e.Epoch()

validators := vv.Build()
err := orderer.Reset(epoch, validators)
if err != nil {
panic(err)
}

for _, e := range ee {
fmt.Printf(">>> E(%d - %d) %s \n", e.Epoch(), e.Frame(), e.ID().String())
buffer.PushEvent(e, "")
}

for f := idx.Frame(0); f <= store.GetLastDecidedFrame(); f++ {
rr := store.GetFrameRoots(f)
for _, r := range rr {
g.nodes[r.ID].isRoot = true
}
}

// reset
vv = make(pos.ValidatorsBuilder, 60)
ee = make(map[hash.Event]dag.Event, 1000)
}
// break after last epoch
if to >= from && e.Epoch() > to {
return false
}

id := len(g.refs)
g.refs = append(g.refs, e.ID())
g.nodes[e.ID()] = &dagNode{
id: int64(id),
hash: e.ID(),
parents: e.Parents(),
}
// accumulate epoch's events and validators
ee[e.ID()] = e
vv.Set(e.Creator(), 1)

return true
})

db.ForEachBlock(func(index idx.Block, block *inter.Block) {
gdb.ForEachBlock(func(index idx.Block, block *inter.Block) {
node, exists := g.nodes[block.Atropos]
if exists {
node.isAtropos = true
Expand Down Expand Up @@ -164,3 +261,9 @@ func (g *dagLoader) Edge(uid, vid int64) graph.Edge {

return nil
}

func panics(name string) func(error) {
return func(err error) {
log.Crit(fmt.Sprintf("%s error", name), "err", err)
}
}
10 changes: 10 additions & 0 deletions utils/dag/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type dagNode struct {
id int64
hash hash.Event
parents hash.Events
isRoot bool
isAtropos bool
}

Expand All @@ -52,6 +53,15 @@ func (n *dagNode) Attributes() []encoding.Attribute {
},
}

if n.isRoot {
aa = append(aa,
encoding.Attribute{
Key: "role",
Value: "Root",
},
)
}

if n.isAtropos {
aa = append(aa,
encoding.Attribute{
Expand Down

0 comments on commit 5af2739

Please sign in to comment.