package importer
import (
"bufio"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
castorev1pb "code.tvl.fyi/tvix/castore-go"
log "github.com/sirupsen/logrus"
)
// the size of individual BlobChunk we send when uploading to BlobService.
const chunkSize = 1024 * 1024
// this produces a callback function that can be used as blobCb for the
// importer.Import function call.
func GenBlobUploaderCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
return func(blobReader io.Reader) ([]byte, error) {
// Ensure the blobReader is buffered to at least the chunk size.
blobReader = bufio.NewReaderSize(blobReader, chunkSize)
putter, err := blobServiceClient.Put(ctx)
if err != nil {
// return error to the importer
return nil, fmt.Errorf("error from blob service: %w", err)
}
blobSize := 0
chunk := make([]byte, chunkSize)
for {
n, err := blobReader.Read(chunk)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("unable to read from blobreader: %w", err)
}
if n != 0 {
log.WithField("chunk_size", n).Debug("sending chunk")
blobSize += n
// send the blob chunk to the server. The err is only valid in the inner scope
if err := putter.Send(&castorev1pb.BlobChunk{
Data: chunk[:n],
}); err != nil {
return nil, fmt.Errorf("sending blob chunk: %w", err)
}
}
// if our read from blobReader returned an EOF, we're done reading
if errors.Is(err, io.EOF) {
break
}
}
resp, err := putter.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("close blob putter: %w", err)
}
log.WithFields(log.Fields{
"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
"blob_size": blobSize,
}).Debug("uploaded blob")
return resp.GetDigest(), nil
}
}