diff options
author | Florian Klink <flokli@flokli.de> | 2024-04-23T14·20+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-04-23T14·53+0000 |
commit | e18bc33529eded5375564b6271ffab24ca25e196 (patch) | |
tree | ec2cbdff5fa580aa54eb85dee61a785691e03947 /tvix/glue/src | |
parent | 72d3f9b91476728e0258d58ce2d15f6ad9662543 (diff) |
fix(tvix/glue/tvix_store_io): remove early return r/7998
Doing the fetch comes up with the root node, but we still need to descend from there to the desired subpath. Move things around to ensure the fetch case also only sets root_node. This logic should probably be moved into smaller, easier to consume functions. Change-Id: I6ab9317df794f53d2504029bbc77859e89fef1ed Reviewed-on: https://cl.tvl.fyi/c/depot/+/11507 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/glue/src')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 323 |
1 files changed, 166 insertions, 157 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index c7133e1d1086..1f709906de7a 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -131,167 +131,176 @@ impl TvixStoreIO { .borrow() .get_fetch_for_output_path(store_path); - if let Some((name, fetch)) = maybe_fetch { - info!(?fetch, "triggering lazy fetch"); - let (sp, root_node) = self - .fetcher - .ingest_and_persist(&name, fetch) - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; - - debug_assert_eq!( - sp.to_string(), - store_path.to_string(), - "store path returned from fetcher should match" - ); - - return Ok(Some(root_node)); - } - - // Look up the derivation for this output path. - let (drv_path, drv) = { - let known_paths = self.known_paths.borrow(); - match known_paths.get_drv_path_for_output_path(store_path) { - Some(drv_path) => ( - drv_path.to_owned(), - known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), - ), - None => { - warn!(store_path=%store_path, "no drv found"); - // let StdIO take over - return Ok(None); - } + match maybe_fetch { + Some((name, fetch)) => { + info!(?fetch, "triggering lazy fetch"); + let (sp, root_node) = self + .fetcher + .ingest_and_persist(&name, fetch) + .await + .map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + + debug_assert_eq!( + sp.to_string(), + store_path.to_string(), + "store path returned from fetcher should match" + ); + + root_node } - }; - - warn!("triggering build"); - - // derivation_to_build_request needs castore nodes for all inputs. - // Provide them, which means, here is where we recursively build - // all dependencies. - #[allow(clippy::mutable_key_type)] - let input_nodes: BTreeSet<Node> = - futures::stream::iter(drv.input_derivations.iter()) - .map(|(input_drv_path, output_names)| { - // look up the derivation object - let input_drv = { - let known_paths = self.known_paths.borrow(); - known_paths - .get_drv_by_drvpath(input_drv_path) - .unwrap_or_else(|| panic!("{} not found", input_drv_path)) - .to_owned() + None => { + // Look up the derivation for this output path. + let (drv_path, drv) = { + let known_paths = self.known_paths.borrow(); + match known_paths.get_drv_path_for_output_path(store_path) { + Some(drv_path) => ( + drv_path.to_owned(), + known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), + ), + None => { + warn!(store_path=%store_path, "no drv found"); + // let StdIO take over + return Ok(None); + } + } + }; + + warn!("triggering build"); + + // derivation_to_build_request needs castore nodes for all inputs. + // Provide them, which means, here is where we recursively build + // all dependencies. + #[allow(clippy::mutable_key_type)] + let input_nodes: BTreeSet<Node> = + futures::stream::iter(drv.input_derivations.iter()) + .map(|(input_drv_path, output_names)| { + // look up the derivation object + let input_drv = { + let known_paths = self.known_paths.borrow(); + known_paths + .get_drv_by_drvpath(input_drv_path) + .unwrap_or_else(|| { + panic!("{} not found", input_drv_path) + }) + .to_owned() + }; + + // convert output names to actual paths + let output_paths: Vec<StorePath> = output_names + .iter() + .map(|output_name| { + input_drv + .outputs + .get(output_name) + .expect("missing output_name") + .path + .as_ref() + .expect("missing output path") + .clone() + }) + .collect(); + // For each output, ask for the castore node. + // We're in a per-derivation context, so if they're + // not built yet they'll all get built together. + // If they don't need to build, we can however still + // substitute all in parallel (if they don't need to + // be built) - so we turn this into a stream of streams. + // It's up to the builder to deduplicate same build requests. + futures::stream::iter(output_paths.into_iter()).map( + |output_path| async move { + let node = self + .store_path_to_node(&output_path, Path::new("")) + .await?; + + if let Some(node) = node { + Ok(node) + } else { + Err(io::Error::other("no node produced")) + } + }, + ) + }) + .flatten() + .buffer_unordered(10) // TODO: make configurable + .try_collect() + .await?; + + // TODO: check if input sources are sufficiently dealth with, + // I think yes, they must be imported into the store by other + // operations, so dealt with in the Some(…) match arm + + // synthesize the build request. + let build_request = derivation_to_build_request(&drv, input_nodes)?; + + // create a build + let build_result = self + .build_service + .as_ref() + .do_build(build_request) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + // TODO: refscan? + + // For each output, insert a PathInfo. + for output in &build_result.outputs { + let root_node = output.node.as_ref().expect("invalid root node"); + + // calculate the nar representation + let (nar_size, nar_sha256) = + self.path_info_service.calculate_nar(root_node).await?; + + // assemble the PathInfo to persist + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node.clone()), + }), + references: vec![], // TODO: refscan + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: Bytes::from(nar_sha256.to_vec()), + signatures: vec![], + reference_names: vec![], // TODO: refscan + deriver: Some(tvix_store::proto::StorePath { + name: drv_path + .name() + .strip_suffix(".drv") + .expect("missing .drv suffix") + .to_string(), + digest: drv_path.digest().to_vec().into(), + }), + ca: drv.fod_digest().map( + |fod_digest| -> tvix_store::proto::nar_info::Ca { + (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256( + fod_digest, + ))) + .into() + }, + ), + }), }; - // convert output names to actual paths - let output_paths: Vec<StorePath> = output_names - .iter() - .map(|output_name| { - input_drv - .outputs - .get(output_name) - .expect("missing output_name") - .path - .as_ref() - .expect("missing output path") - .clone() - }) - .collect(); - // For each output, ask for the castore node. - // We're in a per-derivation context, so if they're - // not built yet they'll all get built together. - // If they don't need to build, we can however still - // substitute all in parallel (if they don't need to - // be built) - so we turn this into a stream of streams. - // It's up to the builder to deduplicate same build requests. - futures::stream::iter(output_paths.into_iter()).map( - |output_path| async move { - let node = self - .store_path_to_node(&output_path, Path::new("")) - .await?; - - if let Some(node) = node { - Ok(node) - } else { - Err(io::Error::other("no node produced")) - } - }, - ) - }) - .flatten() - .buffer_unordered(10) // TODO: make configurable - .try_collect() - .await?; - - // TODO: check if input sources are sufficiently dealth with, - // I think yes, they must be imported into the store by other - // operations, so dealt with in the Some(…) match arm - - // synthesize the build request. - let build_request = derivation_to_build_request(&drv, input_nodes)?; - - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - // TODO: refscan? - - // For each output, insert a PathInfo. - for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); - - // calculate the nar representation - let (nar_size, nar_sha256) = - self.path_info_service.calculate_nar(root_node).await?; - - // assemble the PathInfo to persist - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), - references: vec![], // TODO: refscan - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: Bytes::from(nar_sha256.to_vec()), - signatures: vec![], - reference_names: vec![], // TODO: refscan - deriver: Some(tvix_store::proto::StorePath { - name: drv_path - .name() - .strip_suffix(".drv") - .expect("missing .drv suffix") - .to_string(), - digest: drv_path.digest().to_vec().into(), - }), - ca: drv.fod_digest().map( - |fod_digest| -> tvix_store::proto::nar_info::Ca { - (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))) - .into() - }, - ), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - } + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + } - // find the output for the store path requested - build_result - .outputs - .into_iter() - .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() - }) - .expect("build didn't produce the store path") - .node - .expect("invalid node") + // find the output for the store path requested + build_result + .outputs + .into_iter() + .find(|output_node| { + output_node.node.as_ref().expect("invalid node").get_name() + == store_path.to_string().as_bytes() + }) + .expect("build didn't produce the store path") + .node + .expect("invalid node") + } + } } }; |