about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-18T18·43+0300
committerclbot <clbot@tvl.fyi>2023-05-23T10·48+0000
commitb8ff08b1b0d2dbd8dd546dc9cbdea2f11304d5c8 (patch)
tree89dc726322124a407701c7040e06b3484ea9ba61 /tvix/store/src/directoryservice
parente779b866ccb1d3bbe1a349d2dfa90855e9a436b2 (diff)
refactor(tvix/store/directorysvc): move from Vec<u8> to B3Digest r/6177
This introduces a new struct, B3Digest, which internally holds a
Vec<u8>, but only allows construction with 32 bytes.

It also implements display, which will print the base64 representation.
This should reduce some boilerplate when parsing Vec<u8>.

Change-Id: Ia91aa40cb691916773abc8f93e6ed79a5fd34863
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8592
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/directoryservice')
-rw-r--r--tvix/store/src/directoryservice/grpc.rs79
-rw-r--r--tvix/store/src/directoryservice/memory.rs32
-rw-r--r--tvix/store/src/directoryservice/mod.rs10
-rw-r--r--tvix/store/src/directoryservice/sled.rs32
-rw-r--r--tvix/store/src/directoryservice/traverse.rs13
-rw-r--r--tvix/store/src/directoryservice/utils.rs36
6 files changed, 95 insertions, 107 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index d4ac9fd925fd..46e19224c759 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -2,8 +2,7 @@ use std::collections::HashSet;
 
 use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
-use crate::Error;
-use data_encoding::BASE64;
+use crate::{B3Digest, Error};
 use tokio::sync::mpsc::UnboundedSender;
 use tokio_stream::wrappers::UnboundedReceiverStream;
 use tonic::{transport::Channel, Status};
@@ -38,16 +37,16 @@ impl GRPCDirectoryService {
 impl DirectoryService for GRPCDirectoryService {
     type DirectoriesIterator = StreamIterator;
 
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
 
+        let digest_as_vec = digest.to_vec();
         let task = self.tokio_handle.spawn(async move {
             let mut s = grpc_client
                 .get(proto::GetDirectoryRequest {
                     recursive: false,
-                    by_what: Some(ByWhat::Digest(digest.to_vec())),
+                    by_what: Some(ByWhat::Digest(digest_as_vec)),
                 })
                 .await?
                 .into_inner();
@@ -56,6 +55,7 @@ impl DirectoryService for GRPCDirectoryService {
             s.message().await
         });
 
+        let digest = digest.clone();
         match self.tokio_handle.block_on(task)? {
             Ok(Some(directory)) => {
                 // Validate the retrieved Directory indeed has the
@@ -64,16 +64,14 @@ impl DirectoryService for GRPCDirectoryService {
                 if actual_digest != digest {
                     Err(crate::Error::StorageError(format!(
                         "requested directory with digest {}, but got {}",
-                        BASE64.encode(&digest),
-                        BASE64.encode(&actual_digest)
+                        digest, actual_digest
                     )))
                 } else if let Err(e) = directory.validate() {
                     // Validate the Directory itself is valid.
                     warn!("directory failed validation: {}", e.to_string());
                     Err(crate::Error::StorageError(format!(
                         "directory {} failed validation: {}",
-                        BASE64.encode(&digest),
-                        e,
+                        digest, e,
                     )))
                 } else {
                     Ok(Some(directory))
@@ -85,7 +83,7 @@ impl DirectoryService for GRPCDirectoryService {
         }
     }
 
-    fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
+    fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
 
         let task = self
@@ -93,29 +91,27 @@ impl DirectoryService for GRPCDirectoryService {
             .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
 
         match self.tokio_handle.block_on(task)? {
-            Ok(put_directory_resp) => Ok(put_directory_resp
-                .into_inner()
-                .root_digest
-                .as_slice()
-                .try_into()
-                .map_err(|_| {
-                    Error::StorageError("invalid root digest length in response".to_string())
-                })?),
+            Ok(put_directory_resp) => Ok(B3Digest::from_vec(
+                put_directory_resp.into_inner().root_digest,
+            )
+            .map_err(|_| {
+                Error::StorageError("invalid root digest length in response".to_string())
+            })?),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))]
-    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator {
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
         let mut grpc_client = self.grpc_client.clone();
-        let root_directory_digest = root_directory_digest.to_owned();
 
+        let root_directory_digest_as_vec = root_directory_digest.to_vec();
         let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> =
             self.tokio_handle.spawn(async move {
                 let s = grpc_client
                     .get(proto::GetDirectoryRequest {
                         recursive: true,
-                        by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())),
+                        by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)),
                     })
                     .await?
                     .into_inner();
@@ -125,7 +121,11 @@ impl DirectoryService for GRPCDirectoryService {
 
         let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
 
-        StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream)
+        StreamIterator::new(
+            self.tokio_handle.clone(),
+            root_directory_digest.clone(),
+            stream,
+        )
     }
 
     type DirectoryPutter = GRPCPutter;
@@ -159,22 +159,22 @@ pub struct StreamIterator {
     // A stream of [proto::Directory]
     stream: Streaming<proto::Directory>,
     // The Directory digests we received so far
-    received_directory_digests: HashSet<[u8; 32]>,
+    received_directory_digests: HashSet<B3Digest>,
     // The Directory digests we're still expecting to get sent.
-    expected_directory_digests: HashSet<[u8; 32]>,
+    expected_directory_digests: HashSet<B3Digest>,
 }
 
 impl StreamIterator {
     pub fn new(
         tokio_handle: tokio::runtime::Handle,
-        root_digest: &[u8; 32],
+        root_digest: B3Digest,
         stream: Streaming<proto::Directory>,
     ) -> Self {
         Self {
             tokio_handle,
             stream,
             received_directory_digests: HashSet::new(),
-            expected_directory_digests: HashSet::from([*root_digest]),
+            expected_directory_digests: HashSet::from([root_digest]),
         }
     }
 }
@@ -190,7 +190,7 @@ impl Iterator for StreamIterator {
                     if let Err(e) = directory.validate() {
                         return Some(Err(crate::Error::StorageError(format!(
                             "directory {} failed validation: {}",
-                            BASE64.encode(&directory.digest()),
+                            directory.digest(),
                             e,
                         ))));
                     }
@@ -204,16 +204,19 @@ impl Iterator for StreamIterator {
                         // means it once was in expected_directory_digests)
                         return Some(Err(crate::Error::StorageError(format!(
                             "received unexpected directory {}",
-                            BASE64.encode(&directory_digest)
+                            directory_digest
                         ))));
                     }
                     self.received_directory_digests.insert(directory_digest);
 
                     // register all children in expected_directory_digests.
-                    // We ran validate() above, so we know these digests must be correct.
                     for child_directory in &directory.directories {
+                        // We ran validate() above, so we know these digests must be correct.
+                        let child_directory_digest =
+                            B3Digest::from_vec(child_directory.digest.clone()).unwrap();
+
                         self.expected_directory_digests
-                            .insert(child_directory.digest.clone().try_into().unwrap());
+                            .insert(child_directory_digest);
                     }
 
                     Some(Ok(directory))
@@ -294,7 +297,7 @@ impl DirectoryPutter for GRPCPutter {
     }
 
     /// Closes the stream for sending, and returns the value
-    fn close(&mut self) -> Result<[u8; 32], crate::Error> {
+    fn close(&mut self) -> Result<B3Digest, crate::Error> {
         // get self.rq, and replace it with None.
         // This ensures we can only close it once.
         match std::mem::take(&mut self.rq) {
@@ -303,15 +306,15 @@ impl DirectoryPutter for GRPCPutter {
                 // close directory_sender, so blocking on task will finish.
                 drop(directory_sender);
 
-                Ok(self
+                let root_digest = self
                     .tokio_handle
                     .block_on(task)?
                     .map_err(|e| Error::StorageError(e.to_string()))?
-                    .root_digest
-                    .try_into()
-                    .map_err(|_| {
-                        Error::StorageError("invalid root digest length in response".to_string())
-                    })?)
+                    .root_digest;
+
+                B3Digest::from_vec(root_digest).map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })
             }
         }
     }
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs
index 2b4668a15ecf..524c7664c651 100644
--- a/tvix/store/src/directoryservice/memory.rs
+++ b/tvix/store/src/directoryservice/memory.rs
@@ -1,5 +1,4 @@
-use crate::{proto, Error};
-use data_encoding::BASE64;
+use crate::{proto, B3Digest, Error};
 use std::collections::HashMap;
 use std::sync::{Arc, RwLock};
 use tracing::{instrument, warn};
@@ -9,17 +8,17 @@ use super::{DirectoryService, DirectoryTraverser};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
-    db: Arc<RwLock<HashMap<[u8; 32], proto::Directory>>>,
+    db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
 }
 
 impl DirectoryService for MemoryDirectoryService {
     type DirectoriesIterator = DirectoryTraverser<Self>;
 
-    #[instrument(skip(self, digest), fields(directory.digest = BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
         let db = self.db.read()?;
 
-        match db.get(digest) {
+        match db.get(&digest) {
             // The directory was not found, return
             None => Ok(None),
 
@@ -28,11 +27,10 @@ impl DirectoryService for MemoryDirectoryService {
                 // Validate the retrieved Directory indeed has the
                 // digest we expect it to have, to detect corruptions.
                 let actual_digest = directory.digest();
-                if actual_digest.as_slice() != digest {
+                if actual_digest != *digest {
                     return Err(Error::StorageError(format!(
                         "requested directory with digest {}, but got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest)
+                        digest, actual_digest
                     )));
                 }
 
@@ -41,8 +39,7 @@ impl DirectoryService for MemoryDirectoryService {
                     warn!("directory failed validation: {}", e.to_string());
                     return Err(Error::StorageError(format!(
                         "directory {} failed validation: {}",
-                        BASE64.encode(&actual_digest),
-                        e,
+                        actual_digest, e,
                     )));
                 }
 
@@ -51,28 +48,27 @@ impl DirectoryService for MemoryDirectoryService {
         }
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
-    fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> {
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
         if let Err(e) = directory.validate() {
             return Err(Error::InvalidRequest(format!(
                 "directory {} failed validation: {}",
-                BASE64.encode(&digest),
-                e,
+                digest, e,
             )));
         }
 
         // store it
         let mut db = self.db.write()?;
-        db.insert(digest, directory);
+        db.insert(digest.clone(), directory);
 
         Ok(digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))]
-    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator {
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
         DirectoryTraverser::with(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index e8f11269ec1d..f387d28948f0 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,4 +1,4 @@
-use crate::{proto, Error};
+use crate::{proto, B3Digest, Error};
 mod grpc;
 mod memory;
 mod sled;
@@ -20,16 +20,16 @@ pub trait DirectoryService {
 
     /// Get looks up a single Directory message by its digest.
     /// In case the directory is not found, Ok(None) is returned.
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>;
+    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
     /// Get uploads a single Directory message, and returns the calculated
     /// digest, or an error.
-    fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>;
+    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
 
     /// Looks up a closure of [proto::Directory].
     /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
-    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator;
+    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator;
 
     /// Allows persisting a closure of [proto::Directory], which is a graph of
     /// connected Directory messages.
@@ -50,5 +50,5 @@ pub trait DirectoryPutter {
     fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
-    fn close(&mut self) -> Result<[u8; 32], Error>;
+    fn close(&mut self) -> Result<B3Digest, Error>;
 }
diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs
index d060232307b1..e737ff9b8c99 100644
--- a/tvix/store/src/directoryservice/sled.rs
+++ b/tvix/store/src/directoryservice/sled.rs
@@ -1,6 +1,5 @@
 use crate::proto::Directory;
-use crate::{proto, Error};
-use data_encoding::BASE64;
+use crate::{proto, B3Digest, Error};
 use prost::Message;
 use std::path::PathBuf;
 use tracing::{instrument, warn};
@@ -32,9 +31,9 @@ impl SledDirectoryService {
 impl DirectoryService for SledDirectoryService {
     type DirectoriesIterator = DirectoryTraverser<Self>;
 
-    #[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> {
-        match self.db.get(digest) {
+    #[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = %digest))]
+    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+        match self.db.get(digest.to_vec()) {
             // The directory was not found, return
             Ok(None) => Ok(None),
 
@@ -44,11 +43,10 @@ impl DirectoryService for SledDirectoryService {
                     // Validate the retrieved Directory indeed has the
                     // digest we expect it to have, to detect corruptions.
                     let actual_digest = directory.digest();
-                    if actual_digest.as_slice() != digest {
+                    if actual_digest != *digest {
                         return Err(Error::StorageError(format!(
                             "requested directory with digest {}, but got {}",
-                            BASE64.encode(digest),
-                            BASE64.encode(&actual_digest)
+                            digest, actual_digest
                         )));
                     }
 
@@ -57,15 +55,14 @@ impl DirectoryService for SledDirectoryService {
                         warn!("directory failed validation: {}", e.to_string());
                         return Err(Error::StorageError(format!(
                             "directory {} failed validation: {}",
-                            BASE64.encode(&actual_digest),
-                            e,
+                            actual_digest, e,
                         )));
                     }
 
                     Ok(Some(directory))
                 }
                 Err(e) => {
-                    warn!("unable to parse directory {}: {}", BASE64.encode(digest), e);
+                    warn!("unable to parse directory {}: {}", digest, e);
                     Err(Error::StorageError(e.to_string()))
                 }
             },
@@ -74,28 +71,27 @@ impl DirectoryService for SledDirectoryService {
         }
     }
 
-    #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
-    fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> {
+    #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = %directory.digest()))]
+    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
         if let Err(e) = directory.validate() {
             return Err(Error::InvalidRequest(format!(
                 "directory {} failed validation: {}",
-                BASE64.encode(&digest),
-                e,
+                digest, e,
             )));
         }
         // store it
-        let result = self.db.insert(digest, directory.encode_to_vec());
+        let result = self.db.insert(digest.to_vec(), directory.encode_to_vec());
         if let Err(e) = result {
             return Err(Error::StorageError(e.to_string()));
         }
         Ok(digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))]
-    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator {
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
         DirectoryTraverser::with(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs
index 46bf0c4b6329..f6193f091817 100644
--- a/tvix/store/src/directoryservice/traverse.rs
+++ b/tvix/store/src/directoryservice/traverse.rs
@@ -1,5 +1,5 @@
 use super::DirectoryService;
-use crate::{proto::NamedNode, Error};
+use crate::{proto::NamedNode, B3Digest, Error};
 use tracing::{instrument, warn};
 
 /// This traverses from a (root) node to the given (sub)path, returning the Node
@@ -39,21 +39,18 @@ pub fn traverse_to<DS: DirectoryService>(
                     Ok(None)
                 }
                 crate::proto::node::Node::Directory(directory_node) => {
-                    // fetch the linked node from the directory_service
-                    let digest: [u8; 32] = directory_node
-                        .digest
-                        .try_into()
+                    let digest = B3Digest::from_vec(directory_node.digest)
                         .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
 
+                    // fetch the linked node from the directory_service
                     match directory_service.get(&digest)? {
                         // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
                         None => {
-                            let digest_b64 = data_encoding::BASE64.encode(&digest);
-                            warn!("directory {} does not exist", digest_b64);
+                            warn!("directory {} does not exist", digest);
 
                             Err(Error::StorageError(format!(
                                 "directory {} does not exist",
-                                digest_b64
+                                digest
                             )))
                         }
                         Some(directory) => {
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
index 7d41e8d3ba01..3661808734f3 100644
--- a/tvix/store/src/directoryservice/utils.rs
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -1,6 +1,7 @@
 use super::DirectoryPutter;
 use super::DirectoryService;
 use crate::proto;
+use crate::B3Digest;
 use crate::Error;
 use std::collections::{HashSet, VecDeque};
 use tracing::{debug_span, instrument, warn};
@@ -13,17 +14,17 @@ pub struct DirectoryTraverser<DS: DirectoryService> {
     /// The list of all directories that still need to be traversed. The next
     /// element is picked from the front, new elements are enqueued at the
     /// back.
-    worklist_directory_digests: VecDeque<[u8; 32]>,
+    worklist_directory_digests: VecDeque<B3Digest>,
     /// The list of directory digests already sent to the consumer.
     /// We omit sending the same directories multiple times.
-    sent_directory_digests: HashSet<[u8; 32]>,
+    sent_directory_digests: HashSet<B3Digest>,
 }
 
 impl<DS: DirectoryService> DirectoryTraverser<DS> {
-    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
+    pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self {
         Self {
             directory_service,
-            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
+            worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]),
             sent_directory_digests: HashSet::new(),
         }
     }
@@ -33,12 +34,8 @@ impl<DS: DirectoryService> DirectoryTraverser<DS> {
     // This panics if the digest looks invalid, it's supposed to be checked first.
     fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
         for child_directory_node in &directory.directories {
-            let child_digest: [u8; 32] = child_directory_node
-                .digest
-                .as_slice()
-                .try_into()
-                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
-                .unwrap();
+            // TODO: propagate error
+            let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap();
 
             if self.worklist_directory_digests.contains(&child_digest)
                 || self.sent_directory_digests.contains(&child_digest)
@@ -59,8 +56,7 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
         match self.worklist_directory_digests.pop_front() {
             None => None,
             Some(current_directory_digest) => {
-                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
-                let span = debug_span!("directory.digest", current_directory_b64);
+                let span = debug_span!("directory.digest", "{}", current_directory_digest);
                 let _ = span.enter();
 
                 // look up the directory itself.
@@ -73,24 +69,24 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
                             warn!("directory failed validation: {}", e.to_string());
                             return Some(Err(Error::StorageError(format!(
                                 "invalid directory: {}",
-                                current_directory_b64
+                                current_directory_digest
                             ))));
                         }
                         current_directory
                     }
                     // if it's not there, we have an inconsistent store!
                     Ok(None) => {
-                        warn!("directory {} does not exist", current_directory_b64);
+                        warn!("directory {} does not exist", current_directory_digest);
                         return Some(Err(Error::StorageError(format!(
                             "directory {} does not exist",
-                            current_directory_b64
+                            current_directory_digest
                         ))));
                     }
                     Err(e) => {
                         warn!("failed to look up directory");
                         return Some(Err(Error::StorageError(format!(
                             "unable to look up directory {}: {}",
-                            current_directory_b64, e
+                            current_directory_digest, e
                         ))));
                     }
                 };
@@ -110,7 +106,7 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
 /// TODO: verify connectivity? Factor out these checks into generic helpers?
 pub struct SimplePutter<DS: DirectoryService> {
     directory_service: DS,
-    last_directory_digest: Option<[u8; 32]>,
+    last_directory_digest: Option<B3Digest>,
 }
 
 impl<DS: DirectoryService> SimplePutter<DS> {
@@ -133,9 +129,9 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
     }
 
     /// We need to be mutable here, as that's the signature of the trait.
-    fn close(&mut self) -> Result<[u8; 32], Error> {
-        match self.last_directory_digest {
-            Some(last_digest) => Ok(last_digest),
+    fn close(&mut self) -> Result<B3Digest, Error> {
+        match &self.last_directory_digest {
+            Some(last_digest) => Ok(last_digest.clone()),
             None => Err(Error::InvalidRequest(
                 "no directories sent, can't show root digest".to_string(),
             )),