diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice/memory.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 67 |
1 files changed, 22 insertions, 45 deletions
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index f8435dbbf8..3de3221df2 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,40 +1,24 @@ use super::PathInfoService; -use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; -use tvix_castore::proto as castorepb; +use tracing::instrument; use tvix_castore::Error; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; -pub struct MemoryPathInfoService<BS, DS> { +#[derive(Default)] +pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, - - blob_service: BS, - directory_service: DS, -} - -impl<BS, DS> MemoryPathInfoService<BS, DS> { - pub fn new(blob_service: BS, directory_service: DS) -> Self { - Self { - db: Default::default(), - blob_service, - directory_service, - } - } } #[async_trait] -impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS> -where - BS: AsRef<dyn BlobService> + Send + Sync, - DS: AsRef<dyn DirectoryService> + Send + Sync, -{ +impl PathInfoService for MemoryPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -42,6 +26,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { @@ -53,7 +38,7 @@ where // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -61,24 +46,16 @@ where } } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .await - .map_err(|e| Error::StorageError(e.to_string())) - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } } |