diff options
author | Florian Klink <flokli@flokli.de> | 2024-05-10T05·24+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-05-11T15·07+0000 |
commit | 96b8b1a205e396c94344f9d70bade26d55fc98f4 (patch) | |
tree | 95d125b0ad86d58c0b50010790f1d3edbf59fe8c | |
parent | 11850249c8168ee91fafb0eef5ce23f0e4be618f (diff) |
refactor(tvix/store/pathinfo/memory): tokio RwLock, improve list() r/8112
We don't want to use the std::sync::RwLock here, as it blocks. This also means we don't need to deal with the error cases anymore. The list() implementation is updated to use try_stream, which means we can now avoid collecting everything into a Vec before returning from it. Change-Id: I9057dcc410dc553e6b1be3f20d5ee830569e8218 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11611 Reviewed-by: Connor Brewster <cbrewster@hey.com> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 203611b85443..f1bbf67f8147 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,10 +1,9 @@ use super::PathInfoService; use crate::proto::PathInfo; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; use tvix_castore::Error; @@ -16,7 +15,7 @@ pub struct MemoryPathInfoService { #[async_trait] impl PathInfoService for MemoryPathInfoService { async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -35,7 +34,7 @@ impl PathInfoService for MemoryPathInfoService { // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -44,14 +43,15 @@ impl PathInfoService for MemoryPathInfoService { } fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } } |