diff options
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 323 |
1 files changed, 166 insertions, 157 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index c7133e1d1086..1f709906de7a 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -131,167 +131,176 @@ impl TvixStoreIO { .borrow() .get_fetch_for_output_path(store_path); - if let Some((name, fetch)) = maybe_fetch { - info!(?fetch, "triggering lazy fetch"); - let (sp, root_node) = self - .fetcher - .ingest_and_persist(&name, fetch) - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; - - debug_assert_eq!( - sp.to_string(), - store_path.to_string(), - "store path returned from fetcher should match" - ); - - return Ok(Some(root_node)); - } - - // Look up the derivation for this output path. - let (drv_path, drv) = { - let known_paths = self.known_paths.borrow(); - match known_paths.get_drv_path_for_output_path(store_path) { - Some(drv_path) => ( - drv_path.to_owned(), - known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), - ), - None => { - warn!(store_path=%store_path, "no drv found"); - // let StdIO take over - return Ok(None); - } + match maybe_fetch { + Some((name, fetch)) => { + info!(?fetch, "triggering lazy fetch"); + let (sp, root_node) = self + .fetcher + .ingest_and_persist(&name, fetch) + .await + .map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + + debug_assert_eq!( + sp.to_string(), + store_path.to_string(), + "store path returned from fetcher should match" + ); + + root_node } - }; - - warn!("triggering build"); - - // derivation_to_build_request needs castore nodes for all inputs. - // Provide them, which means, here is where we recursively build - // all dependencies. - #[allow(clippy::mutable_key_type)] - let input_nodes: BTreeSet<Node> = - futures::stream::iter(drv.input_derivations.iter()) - .map(|(input_drv_path, output_names)| { - // look up the derivation object - let input_drv = { - let known_paths = self.known_paths.borrow(); - known_paths - .get_drv_by_drvpath(input_drv_path) - .unwrap_or_else(|| panic!("{} not found", input_drv_path)) - .to_owned() + None => { + // Look up the derivation for this output path. + let (drv_path, drv) = { + let known_paths = self.known_paths.borrow(); + match known_paths.get_drv_path_for_output_path(store_path) { + Some(drv_path) => ( + drv_path.to_owned(), + known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), + ), + None => { + warn!(store_path=%store_path, "no drv found"); + // let StdIO take over + return Ok(None); + } + } + }; + + warn!("triggering build"); + + // derivation_to_build_request needs castore nodes for all inputs. + // Provide them, which means, here is where we recursively build + // all dependencies. + #[allow(clippy::mutable_key_type)] + let input_nodes: BTreeSet<Node> = + futures::stream::iter(drv.input_derivations.iter()) + .map(|(input_drv_path, output_names)| { + // look up the derivation object + let input_drv = { + let known_paths = self.known_paths.borrow(); + known_paths + .get_drv_by_drvpath(input_drv_path) + .unwrap_or_else(|| { + panic!("{} not found", input_drv_path) + }) + .to_owned() + }; + + // convert output names to actual paths + let output_paths: Vec<StorePath> = output_names + .iter() + .map(|output_name| { + input_drv + .outputs + .get(output_name) + .expect("missing output_name") + .path + .as_ref() + .expect("missing output path") + .clone() + }) + .collect(); + // For each output, ask for the castore node. + // We're in a per-derivation context, so if they're + // not built yet they'll all get built together. + // If they don't need to build, we can however still + // substitute all in parallel (if they don't need to + // be built) - so we turn this into a stream of streams. + // It's up to the builder to deduplicate same build requests. + futures::stream::iter(output_paths.into_iter()).map( + |output_path| async move { + let node = self + .store_path_to_node(&output_path, Path::new("")) + .await?; + + if let Some(node) = node { + Ok(node) + } else { + Err(io::Error::other("no node produced")) + } + }, + ) + }) + .flatten() + .buffer_unordered(10) // TODO: make configurable + .try_collect() + .await?; + + // TODO: check if input sources are sufficiently dealth with, + // I think yes, they must be imported into the store by other + // operations, so dealt with in the Some(…) match arm + + // synthesize the build request. + let build_request = derivation_to_build_request(&drv, input_nodes)?; + + // create a build + let build_result = self + .build_service + .as_ref() + .do_build(build_request) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + // TODO: refscan? + + // For each output, insert a PathInfo. + for output in &build_result.outputs { + let root_node = output.node.as_ref().expect("invalid root node"); + + // calculate the nar representation + let (nar_size, nar_sha256) = + self.path_info_service.calculate_nar(root_node).await?; + + // assemble the PathInfo to persist + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node.clone()), + }), + references: vec![], // TODO: refscan + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: Bytes::from(nar_sha256.to_vec()), + signatures: vec![], + reference_names: vec![], // TODO: refscan + deriver: Some(tvix_store::proto::StorePath { + name: drv_path + .name() + .strip_suffix(".drv") + .expect("missing .drv suffix") + .to_string(), + digest: drv_path.digest().to_vec().into(), + }), + ca: drv.fod_digest().map( + |fod_digest| -> tvix_store::proto::nar_info::Ca { + (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256( + fod_digest, + ))) + .into() + }, + ), + }), }; - // convert output names to actual paths - let output_paths: Vec<StorePath> = output_names - .iter() - .map(|output_name| { - input_drv - .outputs - .get(output_name) - .expect("missing output_name") - .path - .as_ref() - .expect("missing output path") - .clone() - }) - .collect(); - // For each output, ask for the castore node. - // We're in a per-derivation context, so if they're - // not built yet they'll all get built together. - // If they don't need to build, we can however still - // substitute all in parallel (if they don't need to - // be built) - so we turn this into a stream of streams. - // It's up to the builder to deduplicate same build requests. - futures::stream::iter(output_paths.into_iter()).map( - |output_path| async move { - let node = self - .store_path_to_node(&output_path, Path::new("")) - .await?; - - if let Some(node) = node { - Ok(node) - } else { - Err(io::Error::other("no node produced")) - } - }, - ) - }) - .flatten() - .buffer_unordered(10) // TODO: make configurable - .try_collect() - .await?; - - // TODO: check if input sources are sufficiently dealth with, - // I think yes, they must be imported into the store by other - // operations, so dealt with in the Some(…) match arm - - // synthesize the build request. - let build_request = derivation_to_build_request(&drv, input_nodes)?; - - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - // TODO: refscan? - - // For each output, insert a PathInfo. - for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); - - // calculate the nar representation - let (nar_size, nar_sha256) = - self.path_info_service.calculate_nar(root_node).await?; - - // assemble the PathInfo to persist - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), - references: vec![], // TODO: refscan - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: Bytes::from(nar_sha256.to_vec()), - signatures: vec![], - reference_names: vec![], // TODO: refscan - deriver: Some(tvix_store::proto::StorePath { - name: drv_path - .name() - .strip_suffix(".drv") - .expect("missing .drv suffix") - .to_string(), - digest: drv_path.digest().to_vec().into(), - }), - ca: drv.fod_digest().map( - |fod_digest| -> tvix_store::proto::nar_info::Ca { - (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))) - .into() - }, - ), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - } + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + } - // find the output for the store path requested - build_result - .outputs - .into_iter() - .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() - }) - .expect("build didn't produce the store path") - .node - .expect("invalid node") + // find the output for the store path requested + build_result + .outputs + .into_iter() + .find(|output_node| { + output_node.node.as_ref().expect("invalid node").get_name() + == store_path.to_string().as_bytes() + }) + .expect("build didn't produce the store path") + .node + .expect("invalid node") + } + } } }; |