From 96b8b1a205e396c94344f9d70bade26d55fc98f4 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 10 May 2024 08:24:24 +0300 Subject: refactor(tvix/store/pathinfo/memory): tokio RwLock, improve list() We don't want to use the std::sync::RwLock here, as it blocks. This also means we don't need to deal with the error cases anymore. The list() implementation is updated to use try_stream, which means we can now avoid collecting everything into a Vec before returning from it. Change-Id: I9057dcc410dc553e6b1be3f20d5ee830569e8218 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11611 Reviewed-by: Connor Brewster Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/store/src/pathinfoservice/memory.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'tvix/store/src/pathinfoservice') diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 203611b85443..f1bbf67f8147 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,10 +1,9 @@ use super::PathInfoService; use crate::proto::PathInfo; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; use tvix_castore::Error; @@ -16,7 +15,7 @@ pub struct MemoryPathInfoService { #[async_trait] impl PathInfoService for MemoryPathInfoService { async fn get(&self, digest: [u8; 20]) -> Result, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -35,7 +34,7 @@ impl PathInfoService for MemoryPathInfoService { // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -44,14 +43,15 @@ impl PathInfoService for MemoryPathInfoService { } fn list(&self) -> BoxStream<'static, Result> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } } -- cgit 1.4.1