about summary refs log tree commit diff
path: root/tvix/build/src/buildservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/build/src/buildservice')
-rw-r--r--tvix/build/src/buildservice/from_addr.rs33
-rw-r--r--tvix/build/src/buildservice/mod.rs3
-rw-r--r--tvix/build/src/buildservice/oci.rs245
3 files changed, 279 insertions, 2 deletions
diff --git a/tvix/build/src/buildservice/from_addr.rs b/tvix/build/src/buildservice/from_addr.rs
index cc5403edefff..a7afba1138af 100644
--- a/tvix/build/src/buildservice/from_addr.rs
+++ b/tvix/build/src/buildservice/from_addr.rs
@@ -2,18 +2,22 @@ use super::{grpc::GRPCBuildService, BuildService, DummyBuildService};
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 use url::Url;
 
+#[cfg(target_os = "linux")]
+use super::oci::OCIBuildService;
+
 /// Constructs a new instance of a [BuildService] from an URI.
 ///
 /// The following schemes are supported by the following services:
 /// - `dummy://` ([DummyBuildService])
+/// - `oci://` ([OCIBuildService])
 /// - `grpc+*://` ([GRPCBuildService])
 ///
 /// As some of these [BuildService] need to talk to a [BlobService] and
 /// [DirectoryService], these also need to be passed in.
 pub async fn from_addr<BS, DS>(
     uri: &str,
-    _blob_service: BS,
-    _directory_service: DS,
+    blob_service: BS,
+    directory_service: DS,
 ) -> std::io::Result<Box<dyn BuildService>>
 where
     BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
@@ -25,6 +29,21 @@ where
     Ok(match url.scheme() {
         // dummy doesn't care about parameters.
         "dummy" => Box::<DummyBuildService>::default(),
+        #[cfg(target_os = "linux")]
+        "oci" => {
+            // oci wants a path in which it creates bundles.
+            if url.path().is_empty() {
+                Err(std::io::Error::other("oci needs a bundle dir as path"))?
+            }
+
+            // TODO: make sandbox shell and rootless_uid_gid
+
+            Box::new(OCIBuildService::new(
+                url.path().into(),
+                blob_service,
+                directory_service,
+            ))
+        }
         scheme => {
             if scheme.starts_with("grpc+") {
                 let client = crate::proto::build_service_client::BuildServiceClient::new(
@@ -50,12 +69,18 @@ mod tests {
     use std::sync::Arc;
 
     use super::from_addr;
+    use lazy_static::lazy_static;
     use rstest::rstest;
+    use tempfile::TempDir;
     use tvix_castore::{
         blobservice::{BlobService, MemoryBlobService},
         directoryservice::{DirectoryService, MemoryDirectoryService},
     };
 
+    lazy_static! {
+        static ref TMPDIR_OCI_1: TempDir = TempDir::new().unwrap();
+    }
+
     #[rstest]
     /// This uses an unsupported scheme.
     #[case::unsupported_scheme("http://foo.example/test", false)]
@@ -73,6 +98,10 @@ mod tests {
     #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
     /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// This configures OCI, but doesn't specify the bundle path
+    #[case::oci_missing_bundle_dir("oci://", false)]
+    /// This configures OCI, specifying the bundle path
+    #[case::oci_bundle_path(&format!("oci://{}", TMPDIR_OCI_1.path().to_str().unwrap()), true)]
     #[tokio::test]
     async fn test_from_addr(#[case] uri_str: &str, #[case] exp_succeed: bool) {
         let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
diff --git a/tvix/build/src/buildservice/mod.rs b/tvix/build/src/buildservice/mod.rs
index a61d782919b9..cdc3cb2afcc3 100644
--- a/tvix/build/src/buildservice/mod.rs
+++ b/tvix/build/src/buildservice/mod.rs
@@ -6,6 +6,9 @@ mod dummy;
 mod from_addr;
 mod grpc;
 
+#[cfg(target_os = "linux")]
+mod oci;
+
 pub use dummy::DummyBuildService;
 pub use from_addr::from_addr;
 
diff --git a/tvix/build/src/buildservice/oci.rs b/tvix/build/src/buildservice/oci.rs
new file mode 100644
index 000000000000..26e6f5027f49
--- /dev/null
+++ b/tvix/build/src/buildservice/oci.rs
@@ -0,0 +1,245 @@
+use anyhow::Context;
+use bstr::BStr;
+use oci_spec::runtime::{LinuxIdMapping, LinuxIdMappingBuilder};
+use tokio::process::{Child, Command};
+use tonic::async_trait;
+use tracing::{debug, instrument, warn, Span};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, fs::fuse::FuseDaemon,
+    import::fs::ingest_path, Node, PathComponent,
+};
+use uuid::Uuid;
+
+use crate::{
+    oci::{get_host_output_paths, make_bundle, make_spec},
+    proto::{Build, BuildRequest},
+};
+use std::{collections::BTreeMap, ffi::OsStr, path::PathBuf, process::Stdio};
+
+use super::BuildService;
+
+const SANDBOX_SHELL: &str = env!("TVIX_BUILD_SANDBOX_SHELL");
+const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable
+
+pub struct OCIBuildService<BS, DS> {
+    /// Root path in which all bundles are created in
+    bundle_root: PathBuf,
+
+    /// uid mappings to set up for the workloads
+    uid_mappings: Vec<LinuxIdMapping>,
+    /// uid mappings to set up for the workloads
+    gid_mappings: Vec<LinuxIdMapping>,
+
+    /// Handle to a [BlobService], used by filesystems spawned during builds.
+    blob_service: BS,
+    /// Handle to a [DirectoryService], used by filesystems spawned during builds.
+    directory_service: DS,
+
+    // semaphore to track number of concurrently running builds.
+    // this is necessary, as otherwise we very quickly run out of open file handles.
+    concurrent_builds: tokio::sync::Semaphore,
+}
+
+impl<BS, DS> OCIBuildService<BS, DS> {
+    pub fn new(bundle_root: PathBuf, blob_service: BS, directory_service: DS) -> Self {
+        // We map root inside the container to the uid/gid this is running at,
+        // and allocate one for uid 1000 into the container from the range we
+        // got in /etc/sub{u,g}id.
+        // TODO: actually read uid, and /etc/subuid. Maybe only when we try to build?
+        // FUTUREWORK: use different uids?
+        Self {
+            bundle_root,
+            blob_service,
+            directory_service,
+            uid_mappings: vec![
+                LinuxIdMappingBuilder::default()
+                    .host_id(1000_u32)
+                    .container_id(0_u32)
+                    .size(1_u32)
+                    .build()
+                    .unwrap(),
+                LinuxIdMappingBuilder::default()
+                    .host_id(100000_u32)
+                    .container_id(1000_u32)
+                    .size(1_u32)
+                    .build()
+                    .unwrap(),
+            ],
+            gid_mappings: vec![
+                LinuxIdMappingBuilder::default()
+                    .host_id(100_u32)
+                    .container_id(0_u32)
+                    .size(1_u32)
+                    .build()
+                    .unwrap(),
+                LinuxIdMappingBuilder::default()
+                    .host_id(100000_u32)
+                    .container_id(100_u32)
+                    .size(1_u32)
+                    .build()
+                    .unwrap(),
+            ],
+            concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS),
+        }
+    }
+}
+
+#[async_trait]
+impl<BS, DS> BuildService for OCIBuildService<BS, DS>
+where
+    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+{
+    #[instrument(skip_all, err)]
+    async fn do_build(&self, request: BuildRequest) -> std::io::Result<Build> {
+        let _permit = self.concurrent_builds.acquire().await.unwrap();
+
+        let bundle_name = Uuid::new_v4();
+        let bundle_path = self.bundle_root.join(bundle_name.to_string());
+
+        let span = Span::current();
+        span.record("bundle_name", bundle_name.to_string());
+
+        let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL)
+            .context("failed to create spec")
+            .map_err(std::io::Error::other)?;
+
+        let mut linux = runtime_spec.linux().clone().unwrap();
+
+        // edit the spec, we need to setup uid/gid mappings.
+        linux.set_uid_mappings(Some(self.uid_mappings.clone()));
+        linux.set_gid_mappings(Some(self.gid_mappings.clone()));
+
+        runtime_spec.set_linux(Some(linux));
+
+        make_bundle(&request, &runtime_spec, &bundle_path)
+            .context("failed to produce bundle")
+            .map_err(std::io::Error::other)?;
+
+        // pre-calculate the locations we want to later ingest, in the order of
+        // the original outputs.
+        // If we can't find calculate that path, don't start the build in first place.
+        let host_output_paths = get_host_output_paths(&request, &bundle_path)
+            .context("failed to calculate host output paths")
+            .map_err(std::io::Error::other)?;
+
+        // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount.
+        let _fuse_daemon = tokio::task::spawn_blocking({
+            let blob_service = self.blob_service.clone();
+            let directory_service = self.directory_service.clone();
+            // assemble a BTreeMap of Nodes to pass into TvixStoreFs.
+            let root_nodes: BTreeMap<PathComponent, Node> =
+                BTreeMap::from_iter(request.inputs.iter().map(|input| {
+                    // We know from validation this is Some.
+                    input.clone().into_name_and_node().unwrap()
+                }));
+
+            debug!(inputs=?root_nodes.keys(), "got inputs");
+
+            let dest = bundle_path.join("inputs");
+
+            move || {
+                let fs = tvix_castore::fs::TvixStoreFs::new(
+                    blob_service,
+                    directory_service,
+                    Box::new(root_nodes),
+                    true,
+                    false,
+                );
+                // mount the filesystem and wait for it to be unmounted.
+                // FUTUREWORK: make fuse daemon threads configurable?
+                FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon")
+            }
+        })
+        .await?
+        .context("mounting")
+        .map_err(std::io::Error::other)?;
+
+        debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle");
+
+        // start the bundle as another process.
+        let child = spawn_bundle(bundle_path, &bundle_name.to_string())?;
+
+        // wait for the process to exit
+        // FUTUREWORK: change the trait to allow reporting progress / logs…
+        let child_output = child
+            .wait_with_output()
+            .await
+            .context("failed to run process")
+            .map_err(std::io::Error::other)?;
+
+        // Check the exit code
+        if !child_output.status.success() {
+            let stdout = BStr::new(&child_output.stdout);
+            let stderr = BStr::new(&child_output.stderr);
+
+            warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed");
+
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::Other,
+                "nonzero exit code".to_string(),
+            ));
+        }
+
+        // Ingest build outputs into the castore.
+        // We use try_join_all here. No need to spawn new tasks, as this is
+        // mostly IO bound.
+        let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map(
+            |(i, p)| {
+                let output_path = request.outputs[i].clone();
+                async move {
+                    debug!(host.path=?p, output.path=?output_path, "ingesting path");
+
+                    let output_node = ingest_path::<_, _, _, &[u8]>(
+                        self.blob_service.clone(),
+                        &self.directory_service,
+                        p,
+                        None,
+                    )
+                    .await
+                    .map_err(|e| {
+                        std::io::Error::new(
+                            std::io::ErrorKind::InvalidData,
+                            format!("Unable to ingest output: {}", e),
+                        )
+                    })?;
+
+                    Ok::<_, std::io::Error>(tvix_castore::proto::Node::from_name_and_node(
+                        "".into(),
+                        output_node,
+                    ))
+                }
+            },
+        ))
+        .await?;
+
+        Ok(Build {
+            build_request: Some(request.clone()),
+            outputs,
+            outputs_needles: vec![], // TODO refscanning
+        })
+    }
+}
+
+/// Spawns runc with the bundle at bundle_path.
+/// On success, returns the child.
+#[instrument(err)]
+fn spawn_bundle(
+    bundle_path: impl AsRef<OsStr> + std::fmt::Debug,
+    bundle_name: &str,
+) -> std::io::Result<Child> {
+    let mut command = Command::new("runc");
+
+    command
+        .args(&[
+            "run".into(),
+            "--bundle".into(),
+            bundle_path.as_ref().to_os_string(),
+            bundle_name.into(),
+        ])
+        .stderr(Stdio::piped())
+        .stdout(Stdio::piped())
+        .stdin(Stdio::null());
+
+    command.spawn()
+}