Skip to content

Commit

Permalink
feat: allows files bigger than 4GiB by sharding
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza committed Dec 19, 2024
1 parent cfbfae6 commit 82f1b1c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ run:

# Lint
lint:
go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.0 run
go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.2 run
.PHONY: lint

# Build
Expand Down
125 changes: 84 additions & 41 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand All @@ -17,16 +18,17 @@ import (
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
ucanto_car "github.com/web3-storage/go-ucanto/core/car"
"github.com/web3-storage/go-ucanto/core/delegation"
"github.com/web3-storage/go-ucanto/did"
"github.com/web3-storage/go-ucanto/principal"
"github.com/web3-storage/go-ucanto/principal/ed25519/signer"
"github.com/web3-storage/go-w3up/capability/storeadd"
"github.com/web3-storage/go-w3up/capability/uploadadd"
"github.com/web3-storage/go-w3up/car/sharding"
"github.com/web3-storage/go-w3up/client"
"github.com/web3-storage/go-w3up/cmd/util"
w3sdelegation "github.com/web3-storage/go-w3up/delegation"
Expand Down Expand Up @@ -252,43 +254,101 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
if err != nil {
return cid.Undef, err
}
defer func() {
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}

var shdlnks []ipld.Link

size := uint64(stat.Size())
mh, err := multihash.SumStream(f, multihash.SHA2_256, -1)
if err != nil {
return cid.Undef, err
if size < sharding.ShardSize {
link := storeShard(c.issuer, c.space, f, []delegation.Delegation{c.proof})
shdlnks = append(shdlnks, link)
} else {
_, blocks, err := ucanto_car.Decode(f)
if err != nil {
log.Fatalf("decoding CAR: %s", err)
}
shds, err := sharding.NewSharder([]ipld.Link{}, blocks)
if err != nil {
log.Fatalf("sharding CAR: %s", err)
}

for {
shd, err := shds.Next()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
link := storeShard(c.issuer, c.space, shd, []delegation.Delegation{c.proof})
shdlnks = append(shdlnks, link)
}
}

shardLink := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), mh)}
rcpt, err := client.StoreAdd(
rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&storeadd.Caveat{Link: shardLink, Size: size},
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: shdlnks},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, fmt.Errorf("%s", rcpt2.Out().Error().Message)
}

return root, nil
}

func storeShard(issuer principal.Signer, space did.DID, shard io.Reader, proofs []delegation.Delegation) ipld.Link {

Check failure on line 316 in uploader.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ipld (typecheck)
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(shard)
if err != nil {
log.Fatalf("reading CAR: %s", err)
}

mh, err := multihash.Sum(buf.Bytes(), multihash.SHA2_256, -1)

Check failure on line 323 in uploader.go

View workflow job for this annotation

GitHub Actions / lint

undefined: multihash (typecheck)
if err != nil {
log.Fatalf("hashing CAR: %s", err)
}

link := cidlink.Link{Cid: cid.NewCidV1(0x0202, mh)}

rcpt, err := client.StoreAdd(
issuer,
space,
&storeadd.Caveat{
Link: link,
Size: uint64(buf.Len()),
},
client.WithConnection(util.MustGetConnection()),
client.WithProofs(proofs),
)
if err != nil {
log.Fatalf("store/add %s: %s", link, err)
}

if rcpt.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt.Out().Error().Message)
log.Fatalf("%+v\n", rcpt.Out().Error())
}

if rcpt.Out().Ok().Status == "upload" {
_, err := f.Seek(0, io.SeekStart)
hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, bytes.NewReader(buf.Bytes()))
if err != nil {
return cid.Undef, err
}

hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, f)
if err != nil {
return cid.Undef, err
log.Fatalf("creating HTTP request: %s", err)
}

hdr := map[string][]string{}
Expand All @@ -298,39 +358,22 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
}
hdr[k] = []string{v}
}

hr.Header = hdr
hr.ContentLength = int64(size)
httpClient := http.Client{
Timeout: 0,
}
hr.ContentLength = int64(buf.Len())
httpClient := http.Client{}
res, err := httpClient.Do(hr)
if err != nil {
return cid.Undef, err
log.Fatalf("doing HTTP request: %s", err)
}

if res.StatusCode != 200 {
return cid.Undef, fmt.Errorf("status code: %d", res.StatusCode)
log.Fatalf("non-200 status code while uploading file: %d", res.StatusCode)
}

if err := res.Body.Close(); err != nil {
return cid.Undef, fmt.Errorf("closing request body: %s", err)
err = res.Body.Close()
if err != nil {
log.Fatalf("closing request body: %s", err)
}
}

rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: []datamodel.Link{shardLink}},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt2.Out().Error().Message)
}

return shardLink.Cid, nil
return link
}

0 comments on commit 82f1b1c

Please sign in to comment.