about summary refs log blame commit diff
path: root/tvix/nar-bridge/pkg/importer/blob_upload.go
blob: c1255dd3ad5d624bd79de9b78117180a2fe7f3b8 (plain) (tree)
1
2
3
4
5
6
7
8
9
                

        
               

                         
                
             
            
 
                                                  

                                        
 

                                                                          
 
                                                                       
                                 
                                                                                                                              
                                                           

                                                                                



                                                         
                                                                                  
                 






                                                                  
                                                                                                 






                                                                                                              
                                                                             

                                                        
                                                                                             







                                                                                          
                 
 

                                                  
                                                                            

                 


                                                                                           
                                         
 
                                            

         
package importer

import (
	"bufio"
	"context"
	"encoding/base64"
	"errors"
	"fmt"
	"io"

	castorev1pb "code.tvl.fyi/tvix/castore-go"
	log "github.com/sirupsen/logrus"
)

// the size of individual BlobChunk we send when uploading to BlobService.
const chunkSize = 1024 * 1024

// this produces a callback function that can be used as blobCb for the
// importer.Import function call.
func GenBlobUploaderCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
	return func(blobReader io.Reader) ([]byte, error) {
		// Ensure the blobReader is buffered to at least the chunk size.
		blobReader = bufio.NewReaderSize(blobReader, chunkSize)

		putter, err := blobServiceClient.Put(ctx)
		if err != nil {
			// return error to the importer
			return nil, fmt.Errorf("error from blob service: %w", err)
		}

		blobSize := 0
		chunk := make([]byte, chunkSize)

		for {
			n, err := blobReader.Read(chunk)
			if err != nil && !errors.Is(err, io.EOF) {
				return nil, fmt.Errorf("unable to read from blobreader: %w", err)
			}

			if n != 0 {
				log.WithField("chunk_size", n).Debug("sending chunk")
				blobSize += n

				// send the blob chunk to the server. The err is only valid in the inner scope
				if err := putter.Send(&castorev1pb.BlobChunk{
					Data: chunk[:n],
				}); err != nil {
					return nil, fmt.Errorf("sending blob chunk: %w", err)
				}
			}

			// if our read from blobReader returned an EOF, we're done reading
			if errors.Is(err, io.EOF) {
				break
			}

		}

		resp, err := putter.CloseAndRecv()
		if err != nil {
			return nil, fmt.Errorf("close blob putter: %w", err)
		}

		log.WithFields(log.Fields{
			"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
			"blob_size":   blobSize,
		}).Debug("uploaded blob")

		return resp.GetDigest(), nil
	}
}