diff options
Diffstat (limited to 'tvix/build/src/buildservice/oci.rs')
-rw-r--r-- | tvix/build/src/buildservice/oci.rs | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/tvix/build/src/buildservice/oci.rs b/tvix/build/src/buildservice/oci.rs new file mode 100644 index 000000000000..26e6f5027f49 --- /dev/null +++ b/tvix/build/src/buildservice/oci.rs @@ -0,0 +1,245 @@ +use anyhow::Context; +use bstr::BStr; +use oci_spec::runtime::{LinuxIdMapping, LinuxIdMappingBuilder}; +use tokio::process::{Child, Command}; +use tonic::async_trait; +use tracing::{debug, instrument, warn, Span}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, fs::fuse::FuseDaemon, + import::fs::ingest_path, Node, PathComponent, +}; +use uuid::Uuid; + +use crate::{ + oci::{get_host_output_paths, make_bundle, make_spec}, + proto::{Build, BuildRequest}, +}; +use std::{collections::BTreeMap, ffi::OsStr, path::PathBuf, process::Stdio}; + +use super::BuildService; + +const SANDBOX_SHELL: &str = env!("TVIX_BUILD_SANDBOX_SHELL"); +const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable + +pub struct OCIBuildService<BS, DS> { + /// Root path in which all bundles are created in + bundle_root: PathBuf, + + /// uid mappings to set up for the workloads + uid_mappings: Vec<LinuxIdMapping>, + /// uid mappings to set up for the workloads + gid_mappings: Vec<LinuxIdMapping>, + + /// Handle to a [BlobService], used by filesystems spawned during builds. + blob_service: BS, + /// Handle to a [DirectoryService], used by filesystems spawned during builds. + directory_service: DS, + + // semaphore to track number of concurrently running builds. + // this is necessary, as otherwise we very quickly run out of open file handles. + concurrent_builds: tokio::sync::Semaphore, +} + +impl<BS, DS> OCIBuildService<BS, DS> { + pub fn new(bundle_root: PathBuf, blob_service: BS, directory_service: DS) -> Self { + // We map root inside the container to the uid/gid this is running at, + // and allocate one for uid 1000 into the container from the range we + // got in /etc/sub{u,g}id. + // TODO: actually read uid, and /etc/subuid. Maybe only when we try to build? + // FUTUREWORK: use different uids? + Self { + bundle_root, + blob_service, + directory_service, + uid_mappings: vec![ + LinuxIdMappingBuilder::default() + .host_id(1000_u32) + .container_id(0_u32) + .size(1_u32) + .build() + .unwrap(), + LinuxIdMappingBuilder::default() + .host_id(100000_u32) + .container_id(1000_u32) + .size(1_u32) + .build() + .unwrap(), + ], + gid_mappings: vec![ + LinuxIdMappingBuilder::default() + .host_id(100_u32) + .container_id(0_u32) + .size(1_u32) + .build() + .unwrap(), + LinuxIdMappingBuilder::default() + .host_id(100000_u32) + .container_id(100_u32) + .size(1_u32) + .build() + .unwrap(), + ], + concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS), + } + } +} + +#[async_trait] +impl<BS, DS> BuildService for OCIBuildService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + #[instrument(skip_all, err)] + async fn do_build(&self, request: BuildRequest) -> std::io::Result<Build> { + let _permit = self.concurrent_builds.acquire().await.unwrap(); + + let bundle_name = Uuid::new_v4(); + let bundle_path = self.bundle_root.join(bundle_name.to_string()); + + let span = Span::current(); + span.record("bundle_name", bundle_name.to_string()); + + let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL) + .context("failed to create spec") + .map_err(std::io::Error::other)?; + + let mut linux = runtime_spec.linux().clone().unwrap(); + + // edit the spec, we need to setup uid/gid mappings. + linux.set_uid_mappings(Some(self.uid_mappings.clone())); + linux.set_gid_mappings(Some(self.gid_mappings.clone())); + + runtime_spec.set_linux(Some(linux)); + + make_bundle(&request, &runtime_spec, &bundle_path) + .context("failed to produce bundle") + .map_err(std::io::Error::other)?; + + // pre-calculate the locations we want to later ingest, in the order of + // the original outputs. + // If we can't find calculate that path, don't start the build in first place. + let host_output_paths = get_host_output_paths(&request, &bundle_path) + .context("failed to calculate host output paths") + .map_err(std::io::Error::other)?; + + // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount. + let _fuse_daemon = tokio::task::spawn_blocking({ + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + // assemble a BTreeMap of Nodes to pass into TvixStoreFs. + let root_nodes: BTreeMap<PathComponent, Node> = + BTreeMap::from_iter(request.inputs.iter().map(|input| { + // We know from validation this is Some. + input.clone().into_name_and_node().unwrap() + })); + + debug!(inputs=?root_nodes.keys(), "got inputs"); + + let dest = bundle_path.join("inputs"); + + move || { + let fs = tvix_castore::fs::TvixStoreFs::new( + blob_service, + directory_service, + Box::new(root_nodes), + true, + false, + ); + // mount the filesystem and wait for it to be unmounted. + // FUTUREWORK: make fuse daemon threads configurable? + FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon") + } + }) + .await? + .context("mounting") + .map_err(std::io::Error::other)?; + + debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle"); + + // start the bundle as another process. + let child = spawn_bundle(bundle_path, &bundle_name.to_string())?; + + // wait for the process to exit + // FUTUREWORK: change the trait to allow reporting progress / logs… + let child_output = child + .wait_with_output() + .await + .context("failed to run process") + .map_err(std::io::Error::other)?; + + // Check the exit code + if !child_output.status.success() { + let stdout = BStr::new(&child_output.stdout); + let stderr = BStr::new(&child_output.stderr); + + warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed"); + + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "nonzero exit code".to_string(), + )); + } + + // Ingest build outputs into the castore. + // We use try_join_all here. No need to spawn new tasks, as this is + // mostly IO bound. + let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map( + |(i, p)| { + let output_path = request.outputs[i].clone(); + async move { + debug!(host.path=?p, output.path=?output_path, "ingesting path"); + + let output_node = ingest_path::<_, _, _, &[u8]>( + self.blob_service.clone(), + &self.directory_service, + p, + None, + ) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Unable to ingest output: {}", e), + ) + })?; + + Ok::<_, std::io::Error>(tvix_castore::proto::Node::from_name_and_node( + "".into(), + output_node, + )) + } + }, + )) + .await?; + + Ok(Build { + build_request: Some(request.clone()), + outputs, + outputs_needles: vec![], // TODO refscanning + }) + } +} + +/// Spawns runc with the bundle at bundle_path. +/// On success, returns the child. +#[instrument(err)] +fn spawn_bundle( + bundle_path: impl AsRef<OsStr> + std::fmt::Debug, + bundle_name: &str, +) -> std::io::Result<Child> { + let mut command = Command::new("runc"); + + command + .args(&[ + "run".into(), + "--bundle".into(), + bundle_path.as_ref().to_os_string(), + bundle_name.into(), + ]) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .stdin(Stdio::null()); + + command.spawn() +} |