about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-05-10T05·24+0300
committerclbot <clbot@tvl.fyi>2024-05-11T15·07+0000
commit96b8b1a205e396c94344f9d70bade26d55fc98f4 (patch)
tree95d125b0ad86d58c0b50010790f1d3edbf59fe8c /tvix/store
parent11850249c8168ee91fafb0eef5ce23f0e4be618f (diff)
refactor(tvix/store/pathinfo/memory): tokio RwLock, improve list() r/8112
We don't want to use the std::sync::RwLock here, as it blocks.

This also means we don't need to deal with the error cases anymore.

The list() implementation is updated to use try_stream, which means we
can now avoid collecting everything into a Vec before returning from it.

Change-Id: I9057dcc410dc553e6b1be3f20d5ee830569e8218
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11611
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs28
1 files changed, 14 insertions, 14 deletions
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index 203611b85443..f1bbf67f8147 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -1,10 +1,9 @@
 use super::PathInfoService;
 use crate::proto::PathInfo;
-use futures::stream::{iter, BoxStream};
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
 use tonic::async_trait;
 use tvix_castore::Error;
 
@@ -16,7 +15,7 @@ pub struct MemoryPathInfoService {
 #[async_trait]
 impl PathInfoService for MemoryPathInfoService {
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        let db = self.db.read().unwrap();
+        let db = self.db.read().await;
 
         match db.get(&digest) {
             None => Ok(None),
@@ -35,7 +34,7 @@ impl PathInfoService for MemoryPathInfoService {
             // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
             // This overwrites existing PathInfo objects.
             Ok(nix_path) => {
-                let mut db = self.db.write().unwrap();
+                let mut db = self.db.write().await;
                 db.insert(*nix_path.digest(), path_info.clone());
 
                 Ok(path_info)
@@ -44,14 +43,15 @@ impl PathInfoService for MemoryPathInfoService {
     }
 
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        let db = self.db.read().unwrap();
+        let db = self.db.clone();
 
-        // Copy all elements into a list.
-        // This is a bit ugly, because we can't have db escape the lifetime
-        // of this function, but elements need to be returned owned anyways, and this in-
-        // memory impl is only for testing purposes anyways.
-        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+        Box::pin(try_stream! {
+            let db = db.read().await;
+            let it = db.iter();
 
-        Box::pin(iter(items))
+            for (_k, v) in it {
+                yield v.clone()
+            }
+        })
     }
 }