Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit push to the gateway #1561

Merged
merged 11 commits into from
Jan 11, 2024
2 changes: 1 addition & 1 deletion backend/cmd/mintter-site/sites/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Load(ctx context.Context, address string, cfg config.Config, dir *storage.D
dbPromise := future.New[*sqlitex.Pool]()
blobsPromise := future.New[*hyper.Storage]()

site := NewServer(address, blobsPromise.ReadOnly, nodePromise.ReadOnly, dbPromise.ReadOnly)
site := NewServer(address, blobsPromise.ReadOnly, nodePromise.ReadOnly, dbPromise.ReadOnly, cfg.Syncing.AllowPush)

app, err := daemon.Load(ctx, cfg, dir, site, daemon.GenericHandler{
Path: "/.well-known/hypermedia-site",
Expand Down
66 changes: 34 additions & 32 deletions backend/cmd/mintter-site/sites/sites.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ type Website struct {

once sync.Once
setupSecret string
allowPush bool
}

var errNodeNotReadyYet = errors.New("P2P node is not ready yet")

// NewServer creates a new server for the site.
func NewServer(url string, blobs *future.ReadOnly[*hyper.Storage], n *future.ReadOnly[*mttnet.Node], db *future.ReadOnly[*sqlitex.Pool]) *Website {
func NewServer(url string, blobs *future.ReadOnly[*hyper.Storage], n *future.ReadOnly[*mttnet.Node], db *future.ReadOnly[*sqlitex.Pool], allowPush bool) *Website {
return &Website{
blobs: blobs,
node: n,
db: db,
url: url,
blobs: blobs,
node: n,
db: db,
url: url,
allowPush: allowPush,
}
}
func (ws *Website) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -238,7 +240,7 @@ func (ws *Website) GetGroupID(ctx context.Context) (string, error) {
return groupID, nil
}

// PublishBlobs publish blobs to the website.
// PublishBlobs publishes blobs to the website.
func (ws *Website) PublishBlobs(ctx context.Context, in *groups.PublishBlobsRequest) (*groups.PublishBlobsResponse, error) {
n, ok := ws.node.Get()
if !ok {
Expand All @@ -265,38 +267,38 @@ func (ws *Website) PublishBlobs(ctx context.Context, in *groups.PublishBlobsRequ
if err != nil {
return nil, err
}
if !ws.allowPush { // if force push is not allowed (default) then only group members can push. Everyone otherwise.
var role groups.Role
if err := db.WithSave(ctx, func(conn *sqlite.Conn) error {
// Get the owner's view of the list of members.
groupID, err := storage.GetKV(ctx, conn, keySiteGroup)
if err != nil || groupID == "" {
return fmt.Errorf("error getting groupID on the site, is the site initialized?: %w", err)
}

var role groups.Role
if err := db.WithSave(ctx, func(conn *sqlite.Conn) error {
// Get the owner's view of the list of members.
groupID, err := storage.GetKV(ctx, conn, keySiteGroup)
if err != nil || groupID == "" {
return fmt.Errorf("error getting groupID on the site, is the site initialized?: %w", err)
}

groupOwner, err := storage.GetKV(ctx, conn, keySiteOwner)
if err != nil || groupOwner == "" {
return fmt.Errorf("error getting group owner on the site, is the site initialized?: %w", err)
}
groupOwner, err := storage.GetKV(ctx, conn, keySiteOwner)
if err != nil || groupOwner == "" {
return fmt.Errorf("error getting group owner on the site, is the site initialized?: %w", err)
}

if groupOwner == callerAccount.String() {
role = groups.Role_OWNER
} else {
r, err := hypersql.GetGroupRole(conn, groupID, "hm://a/"+callerAccount.String())
if err != nil {
return err
if groupOwner == callerAccount.String() {
role = groups.Role_OWNER
} else {
r, err := hypersql.GetGroupRole(conn, groupID, "hm://a/"+callerAccount.String())
if err != nil {
return err
}
role = groups.Role(r)
}
role = groups.Role(r)
return nil
}); err != nil {
return nil, err
}
return nil
}); err != nil {
return nil, err
}

if role != groups.Role_OWNER && role != groups.Role_EDITOR {
return nil, status.Errorf(codes.PermissionDenied, "Caller %q does not have enough permissions to publish to this site.", callerAccount.String())
if role != groups.Role_OWNER && role != groups.Role_EDITOR {
return nil, status.Errorf(codes.PermissionDenied, "Caller %q does not have enough permissions to publish to this site.", callerAccount.String())
}
}

blobs, err := ws.blobs.Await(ctx)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ type Syncing struct {
TimeoutPerPeer time.Duration
NoPull bool
NoDiscovery bool
AllowPush bool
}

// BindFlags binds the flags to the given FlagSet.
func (c *Syncing) BindFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.WarmupDuration, "syncing.warmup-duration", c.WarmupDuration, "Time to wait before the first sync loop iteration")
fs.DurationVar(&c.Interval, "syncing.interval", c.Interval, "Periodic interval at which sync loop is triggered")
fs.DurationVar(&c.TimeoutPerPeer, "syncing.timeout-per-peer", c.TimeoutPerPeer, "Maximum duration for syncing with a single peer")
fs.BoolVar(&c.AllowPush, "syncing.allow-push", c.AllowPush, "Allows direct content push. Anyone could force push content.")
fs.BoolVar(&c.NoPull, "syncing.no-pull", c.NoPull, "Disables periodic content pulling")
fs.BoolVar(&c.NoDiscovery, "syncing.no-discovery", c.NoDiscovery, "Disables the ability to discover content from other peers")
}
Expand Down
16 changes: 15 additions & 1 deletion backend/daemon/api/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func New(
return nil
}

documentsSrv := documents.NewServer(repo.Identity(), db, &lazyDiscoverer{sync: sync, net: node}, LogLevel)
documentsSrv := documents.NewServer(repo.Identity(), db, &lazyDiscoverer{sync: sync, net: node}, &lazyGwClient{net: node}, LogLevel)
return Server{
Accounts: accounts.NewServer(repo.Identity(), blobs),
Daemon: daemon.NewServer(repo, blobs, wallet, doSync),
Expand All @@ -69,6 +69,20 @@ func New(
}
}

type lazyGwClient struct {
net *future.ReadOnly[*mttnet.Node]
}

// Connect connects to a remote gateway. Necessary here for the grpc server to add a site
// that needs to connect to the site under the hood.
func (ld *lazyGwClient) GatewayClient(ctx context.Context, url string) (mttnet.GatewayClient, error) {
node, ok := ld.net.Get()
if !ok {
return nil, fmt.Errorf("p2p node is not yet initialized")
}
return node.GatewayClient(ctx, url)
}

type lazyDiscoverer struct {
sync *future.ReadOnly[*syncing.Service]
net *future.ReadOnly[*mttnet.Node]
Expand Down
104 changes: 94 additions & 10 deletions backend/daemon/api/documents/v1alpha/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"context"
"fmt"
"mintter/backend/core"
groups "mintter/backend/daemon/api/groups/v1alpha"
documents "mintter/backend/genproto/documents/v1alpha"
groups_proto "mintter/backend/genproto/groups/v1alpha"
"mintter/backend/mttnet"

"mintter/backend/hyper"
"mintter/backend/hyper/hypersql"
"mintter/backend/logging"
"mintter/backend/pkg/colx"
"mintter/backend/pkg/future"

"crawshaw.io/sqlite"
Expand All @@ -31,21 +36,29 @@ type Discoverer interface {
Connect(context.Context, peer.AddrInfo) error
}

// GatewayClient used to connect to the gateway and push content.
type GatewayClient interface {
// GatewayClient used to connect to the gateway and push content.
GatewayClient(context.Context, string) (mttnet.GatewayClient, error)
}

// Server implements DocumentsServer gRPC API.
type Server struct {
db *sqlitex.Pool
me *future.ReadOnly[core.Identity]
disc Discoverer
blobs *hyper.Storage
db *sqlitex.Pool
me *future.ReadOnly[core.Identity]
disc Discoverer
blobs *hyper.Storage
gwClient GatewayClient
}

// NewServer creates a new RPC handler.
func NewServer(me *future.ReadOnly[core.Identity], db *sqlitex.Pool, disc Discoverer, LogLevel string) *Server {
func NewServer(me *future.ReadOnly[core.Identity], db *sqlitex.Pool, disc Discoverer, gwClient GatewayClient, LogLevel string) *Server {
srv := &Server{
db: db,
me: me,
disc: disc,
blobs: hyper.NewStorage(db, logging.New("mintter/hyper", LogLevel)),
db: db,
me: me,
disc: disc,
blobs: hyper.NewStorage(db, logging.New("mintter/hyper", LogLevel)),
gwClient: gwClient,
}

return srv
Expand Down Expand Up @@ -238,7 +251,7 @@ func (api *Server) GetDraft(ctx context.Context, in *documents.GetDraftRequest)
}

// ListDrafts implements the corresponding gRPC method.
func (api *Server) ListDrafts(ctx context.Context, in *documents.ListDraftsRequest) (*documents.ListDraftsResponse, error) {
func (api *Server) ListDrafts(ctx context.Context, _ *documents.ListDraftsRequest) (*documents.ListDraftsResponse, error) {
entities, err := api.blobs.ListEntities(ctx, "hm://d/*")
if err != nil {
return nil, err
Expand Down Expand Up @@ -443,6 +456,77 @@ func (api *Server) DeletePublication(ctx context.Context, in *documents.DeletePu
return &emptypb.Empty{}, nil
}

// PushPublication implements the corresponding gRPC method.
func (api *Server) PushPublication(ctx context.Context, in *documents.PushPublicationRequest) (*emptypb.Empty, error) {
if in.DocumentId == "" {
return nil, status.Errorf(codes.InvalidArgument, "must specify publication ID")
}

if in.Url == "" {
return nil, status.Errorf(codes.InvalidArgument, "must specify an url")
}

juligasa marked this conversation as resolved.
Show resolved Hide resolved
// If no gwClient is set we can't do anything else.
if api.gwClient == nil {
return nil, status.Errorf(codes.FailedPrecondition, "there is no gwClient definition")
}

eid := hyper.EntityID(in.DocumentId)

entity, err := api.blobs.LoadEntity(ctx, eid)

if err != nil {
return nil, status.Errorf(codes.Internal, "unable to get entity[%s]: %v", eid.String(), err)
}

if entity == nil {
return nil, status.Errorf(codes.NotFound, "no published changes for entity %s", eid.String())
}

conn, cancelFcn, err := api.db.Conn(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to get db connection: %v", err)
}
defer cancelFcn()

gdb, err := hypersql.EntitiesLookupID(conn, entity.ID().String())
if err != nil {
return nil, status.Errorf(codes.NotFound, "unable to find entity id [%s]: %v", entity.ID().String(), err)
}
if gdb.ResourcesID == 0 {
return nil, status.Errorf(codes.NotFound, "document %s not found", entity.ID().String())
}

cids := []cid.Cid{}
err = sqlitex.Exec(conn, groups.QCollectBlobs(), func(stmt *sqlite.Stmt) error {
var (
id int64
codec int64
multihash []byte
)
stmt.Scan(&id, &codec, &multihash)

c := cid.NewCidV1(uint64(codec), multihash)
cids = append(cids, c)
return nil
}, gdb.ResourcesID)
juligasa marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, status.Errorf(codes.NotFound, "couldn't find referenced materials for document %s: %v", entity.ID().String(), err)
}

gc, err := api.gwClient.GatewayClient(ctx, in.Url)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get site client: %v", err)
}

if _, err := gc.PublishBlobs(ctx, &groups_proto.PublishBlobsRequest{
Blobs: colx.SliceMap(cids, cid.Cid.String),
}); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "failed to push blobs to the gateway: %v", err)
}
return &emptypb.Empty{}, nil
}

// ListPublications implements the corresponding gRPC method.
func (api *Server) ListPublications(ctx context.Context, in *documents.ListPublicationsRequest) (*documents.ListPublicationsResponse, error) {
var (
Expand Down
2 changes: 1 addition & 1 deletion backend/daemon/api/documents/v1alpha/documents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func newTestDocsAPI(t *testing.T, name string) *Server {
fut := future.New[core.Identity]()
require.NoError(t, fut.Resolve(u.Identity))

srv := NewServer(fut.ReadOnly, db, nil, "debug")
srv := NewServer(fut.ReadOnly, db, nil, nil, "debug")
bs := hyper.NewStorage(db, logging.New("mintter/hyper", "debug"))
_, err := daemon.Register(context.Background(), bs, u.Account, u.Device.PublicKey, time.Now())
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions backend/daemon/api/groups/v1alpha/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (db *DB) ForEachRelatedBlob(ctx context.Context, group hyper.EntityID, fn f
return status.Errorf(codes.NotFound, "group %s not found", group)
}

return sqlitex.Exec(conn, qCollectBlobs(), func(stmt *sqlite.Stmt) error {
return sqlitex.Exec(conn, QCollectBlobs(), func(stmt *sqlite.Stmt) error {
var (
id int64
codec int64
Expand Down Expand Up @@ -152,7 +152,7 @@ func (db *DB) ForEachRelatedBlob(ctx context.Context, group hyper.EntityID, fn f
// if resource_link.is_pinned:
// continue
// enqueue(resource_link.target, kind=0)
var qCollectBlobs = dqb.Str(`
var QCollectBlobs = dqb.Str(`
WITH RECURSIVE selected (id, kind) AS (
SELECT :group, 0
UNION
Expand Down
5 changes: 3 additions & 2 deletions backend/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Load(ctx context.Context, cfg config.Config, r *storage.Dir, extraOpts ...i

otel.SetTracerProvider(tp)

a.DB, err = initSQLite(ctx, &a.clean, a.Storage.SQLitePath())
a.DB, err = initSQLite(&a.clean, a.Storage.SQLitePath())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,6 +144,7 @@ func Load(ctx context.Context, cfg config.Config, r *storage.Dir, extraOpts ...i
extraHTTPHandlers = append(extraHTTPHandlers, httpHandler)
}
}

a.GRPCServer, a.GRPCListener, a.RPC, err = initGRPC(ctx, cfg.GRPC.Port, &a.clean, a.g, a.Storage, a.DB, a.Blobs, a.Net, a.Syncing, a.Wallet, cfg.LogLevel, extraOpts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -253,7 +254,7 @@ func (a *App) Wait() error {
return a.g.Wait()
}

func initSQLite(ctx context.Context, clean *cleanup.Stack, path string) (*sqlitex.Pool, error) {
func initSQLite(clean *cleanup.Stack, path string) (*sqlitex.Pool, error) {
poolSize := int(float64(runtime.NumCPU()) / 2)
if poolSize == 0 {
poolSize = 2
Expand Down
23 changes: 22 additions & 1 deletion backend/daemon/daemon_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
networking "mintter/backend/genproto/networking/v1alpha"
p2p "mintter/backend/genproto/p2p/v1alpha"
"mintter/backend/hyper"
"mintter/backend/ipfs"
"mintter/backend/mttnet"
"mintter/backend/pkg/must"
"mintter/backend/testutil"
Expand Down Expand Up @@ -118,6 +119,27 @@ func TestDaemonListPublications(t *testing.T) {
require.Len(t, list.Publications, 0, "account object must not be listed as publication")
}

func TestDaemonPushPublication(t *testing.T) {
t.Parallel()
t.Skip("Test uses real infra")
cfg := makeTestConfig(t)
cfg.P2P.TestnetName = "dev"
alice := makeTestApp(t, "alice", cfg, true)
ctx := context.Background()

pub := publishDocument(t, ctx, alice)
_, err := alice.RPC.Documents.PushPublication(ctx, &documents.PushPublicationRequest{
DocumentId: pub.Document.Id,
Url: ipfs.TestGateway,
})
require.NoError(t, err)
_, err = alice.RPC.Documents.PushPublication(ctx, &documents.PushPublicationRequest{
DocumentId: pub.Document.Id,
Url: "https://gabo.es/",
})
require.Error(t, err)
}

func TestAPIGetRemotePublication(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -369,7 +391,6 @@ func TestTrustedPeers(t *testing.T) {
require.Equal(t, int64(1), sr.NumSyncOK)
require.Equal(t, int64(0), sr.NumSyncFailed)
require.ElementsMatch(t, []peer.ID{alice.Storage.Device().PeerID(), bob.Storage.Device().PeerID()}, sr.Peers)

}

{
Expand Down
Loading