about summary refs log tree commit diff
path: root/users/flokli/nixos/archeology-ec2
diff options
context:
space:
mode:
Diffstat (limited to 'users/flokli/nixos/archeology-ec2')
-rw-r--r--users/flokli/nixos/archeology-ec2/OWNERS1
-rw-r--r--users/flokli/nixos/archeology-ec2/configuration.nix35
-rw-r--r--users/flokli/nixos/archeology-ec2/hardware-configuration.nix36
-rw-r--r--users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py62
4 files changed, 134 insertions, 0 deletions
diff --git a/users/flokli/nixos/archeology-ec2/OWNERS b/users/flokli/nixos/archeology-ec2/OWNERS
new file mode 100644
index 0000000000..b9bc074a80
--- /dev/null
+++ b/users/flokli/nixos/archeology-ec2/OWNERS
@@ -0,0 +1 @@
+edef
diff --git a/users/flokli/nixos/archeology-ec2/configuration.nix b/users/flokli/nixos/archeology-ec2/configuration.nix
new file mode 100644
index 0000000000..f0fc0c5d09
--- /dev/null
+++ b/users/flokli/nixos/archeology-ec2/configuration.nix
@@ -0,0 +1,35 @@
+{ depot, pkgs, modulesPath, ... }:
+
+{
+  imports = [
+    "${modulesPath}/virtualisation/amazon-image.nix"
+    ../profiles/archeology.nix
+  ];
+
+  systemd.timers.parse-bucket-logs = {
+    wantedBy = [ "multi-user.target" ];
+    timerConfig.OnCalendar = "*-*-* 03:00:00 UTC";
+  };
+
+  systemd.services.parse-bucket-logs = {
+    path = [ depot.users.flokli.archeology.parse-bucket-logs ];
+    serviceConfig = {
+      Type = "oneshot";
+      ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously"
+        {
+          libraries = [ pkgs.python3Packages.boto3 ];
+        } ./parse-bucket-logs-continuously.py);
+      DynamicUser = "yes";
+      StateDirectory = "parse-bucket-logs";
+    };
+  };
+
+  environment.systemPackages = [
+    depot.users.flokli.archeology.parse-bucket-logs
+  ];
+
+  networking.hostName = "archeology-ec2";
+
+  system.stateVersion = "23.05"; # Did you read the comment?
+}
+
diff --git a/users/flokli/nixos/archeology-ec2/hardware-configuration.nix b/users/flokli/nixos/archeology-ec2/hardware-configuration.nix
new file mode 100644
index 0000000000..7b3d79d70a
--- /dev/null
+++ b/users/flokli/nixos/archeology-ec2/hardware-configuration.nix
@@ -0,0 +1,36 @@
+{ lib, modulesPath, ... }:
+
+{
+  imports =
+    [
+      (modulesPath + "/profiles/qemu-guest.nix")
+    ];
+
+  boot.initrd.availableKernelModules = [ "ahci" "xhci_pci" "virtio_pci" "sr_mod" "virtio_blk" ];
+  boot.initrd.kernelModules = [ ];
+  boot.kernelModules = [ "kvm-amd" ];
+  boot.extraModulePackages = [ ];
+
+  fileSystems."/" =
+    {
+      device = "/dev/disk/by-partlabel/root";
+      fsType = "xfs";
+    };
+
+  fileSystems."/boot" =
+    {
+      device = "/dev/disk/by-partlabel/boot";
+      fsType = "vfat";
+    };
+
+  swapDevices = [ ];
+
+  # Enables DHCP on each ethernet and wireless interface. In case of scripted networking
+  # (the default) this is the recommended approach. When using systemd-networkd it's
+  # still possible to use this option, but it's recommended to use it in conjunction
+  # with explicit per-interface declarations with `networking.interfaces.<interface>.useDHCP`.
+  networking.useDHCP = lib.mkDefault true;
+  # networking.interfaces.enp1s0.useDHCP = lib.mkDefault true;
+
+  nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
+}
diff --git a/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py
new file mode 100644
index 0000000000..f6ec8fb77c
--- /dev/null
+++ b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py
@@ -0,0 +1,62 @@
+import boto3
+import datetime
+import os
+import re
+import subprocess
+import tempfile
+
+s3 = boto3.resource('s3')
+bucket_name = "nix-archeologist"
+prefix = "nix-cache-bucket-logs/"
+
+bucket = s3.Bucket(bucket_name)
+
+key_pattern = re.compile(r'.*\/(?P<y>\d{4})-(?P<m>\d{2})-(?P<d>\d{2})\.parquet$')  # noqa: E501
+
+# get a listing (which is sorted), grab the most recent key
+last_elem = list(
+    o for o in bucket.objects.filter(Prefix=prefix)
+    if key_pattern.match(o.key)
+).pop()
+
+# extract the date of that key.
+m = key_pattern.search(last_elem.key)
+last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d")))  # noqa: E501
+
+# get the current date (UTC)
+now = datetime.datetime.now(tz=datetime.UTC)
+now_date = datetime.date(now.year, now.month, now.day)
+
+while True:
+    # Calculate what date would be processed next.
+    next_elem_date = last_elem_date + datetime.timedelta(days=1)
+
+    # If that's today, we don't want to process it.
+    if next_elem_date == now_date:
+        print("Caught up, would process data from today.")
+        break
+
+    # If we'd be processing data from yesterday, but it's right after midnight,
+    # also don't process - data might still be flushed.
+    if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0:  # noqa: E501
+        print("Not processing data from previous day right after midnight")
+        break
+
+    src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*"  # noqa: E501
+
+    # Invoke parse-bucket-logs script inside a tempdir and upload on success.
+    with tempfile.TemporaryDirectory() as td:
+        work_file_name = os.path.join(td, "output.parquet")
+        args = ["archeology-parse-bucket-logs", src, work_file_name]
+        subprocess.run(
+            args,
+            check=True  # throw exception if nonzero exit code
+        )
+
+        dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet"
+
+        # Upload the file
+        print(f"uploading to s3://{bucket_name}{dest_key}")
+        bucket.upload_file(work_file_name, dest_key)
+
+    last_elem_date = next_elem_date