diff options
Diffstat (limited to 'users/flokli/nixos/archeology-ec2')
4 files changed, 134 insertions, 0 deletions
diff --git a/users/flokli/nixos/archeology-ec2/OWNERS b/users/flokli/nixos/archeology-ec2/OWNERS new file mode 100644 index 0000000000..b9bc074a80 --- /dev/null +++ b/users/flokli/nixos/archeology-ec2/OWNERS @@ -0,0 +1 @@ +edef diff --git a/users/flokli/nixos/archeology-ec2/configuration.nix b/users/flokli/nixos/archeology-ec2/configuration.nix new file mode 100644 index 0000000000..f0fc0c5d09 --- /dev/null +++ b/users/flokli/nixos/archeology-ec2/configuration.nix @@ -0,0 +1,35 @@ +{ depot, pkgs, modulesPath, ... }: + +{ + imports = [ + "${modulesPath}/virtualisation/amazon-image.nix" + ../profiles/archeology.nix + ]; + + systemd.timers.parse-bucket-logs = { + wantedBy = [ "multi-user.target" ]; + timerConfig.OnCalendar = "*-*-* 03:00:00 UTC"; + }; + + systemd.services.parse-bucket-logs = { + path = [ depot.users.flokli.archeology.parse-bucket-logs ]; + serviceConfig = { + Type = "oneshot"; + ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously" + { + libraries = [ pkgs.python3Packages.boto3 ]; + } ./parse-bucket-logs-continuously.py); + DynamicUser = "yes"; + StateDirectory = "parse-bucket-logs"; + }; + }; + + environment.systemPackages = [ + depot.users.flokli.archeology.parse-bucket-logs + ]; + + networking.hostName = "archeology-ec2"; + + system.stateVersion = "23.05"; # Did you read the comment? +} + diff --git a/users/flokli/nixos/archeology-ec2/hardware-configuration.nix b/users/flokli/nixos/archeology-ec2/hardware-configuration.nix new file mode 100644 index 0000000000..7b3d79d70a --- /dev/null +++ b/users/flokli/nixos/archeology-ec2/hardware-configuration.nix @@ -0,0 +1,36 @@ +{ lib, modulesPath, ... }: + +{ + imports = + [ + (modulesPath + "/profiles/qemu-guest.nix") + ]; + + boot.initrd.availableKernelModules = [ "ahci" "xhci_pci" "virtio_pci" "sr_mod" "virtio_blk" ]; + boot.initrd.kernelModules = [ ]; + boot.kernelModules = [ "kvm-amd" ]; + boot.extraModulePackages = [ ]; + + fileSystems."/" = + { + device = "/dev/disk/by-partlabel/root"; + fsType = "xfs"; + }; + + fileSystems."/boot" = + { + device = "/dev/disk/by-partlabel/boot"; + fsType = "vfat"; + }; + + swapDevices = [ ]; + + # Enables DHCP on each ethernet and wireless interface. In case of scripted networking + # (the default) this is the recommended approach. When using systemd-networkd it's + # still possible to use this option, but it's recommended to use it in conjunction + # with explicit per-interface declarations with `networking.interfaces.<interface>.useDHCP`. + networking.useDHCP = lib.mkDefault true; + # networking.interfaces.enp1s0.useDHCP = lib.mkDefault true; + + nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux"; +} diff --git a/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py new file mode 100644 index 0000000000..f6ec8fb77c --- /dev/null +++ b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py @@ -0,0 +1,62 @@ +import boto3 +import datetime +import os +import re +import subprocess +import tempfile + +s3 = boto3.resource('s3') +bucket_name = "nix-archeologist" +prefix = "nix-cache-bucket-logs/" + +bucket = s3.Bucket(bucket_name) + +key_pattern = re.compile(r'.*\/(?P<y>\d{4})-(?P<m>\d{2})-(?P<d>\d{2})\.parquet$') # noqa: E501 + +# get a listing (which is sorted), grab the most recent key +last_elem = list( + o for o in bucket.objects.filter(Prefix=prefix) + if key_pattern.match(o.key) +).pop() + +# extract the date of that key. +m = key_pattern.search(last_elem.key) +last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d"))) # noqa: E501 + +# get the current date (UTC) +now = datetime.datetime.now(tz=datetime.UTC) +now_date = datetime.date(now.year, now.month, now.day) + +while True: + # Calculate what date would be processed next. + next_elem_date = last_elem_date + datetime.timedelta(days=1) + + # If that's today, we don't want to process it. + if next_elem_date == now_date: + print("Caught up, would process data from today.") + break + + # If we'd be processing data from yesterday, but it's right after midnight, + # also don't process - data might still be flushed. + if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0: # noqa: E501 + print("Not processing data from previous day right after midnight") + break + + src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*" # noqa: E501 + + # Invoke parse-bucket-logs script inside a tempdir and upload on success. + with tempfile.TemporaryDirectory() as td: + work_file_name = os.path.join(td, "output.parquet") + args = ["archeology-parse-bucket-logs", src, work_file_name] + subprocess.run( + args, + check=True # throw exception if nonzero exit code + ) + + dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet" + + # Upload the file + print(f"uploading to s3://{bucket_name}{dest_key}") + bucket.upload_file(work_file_name, dest_key) + + last_elem_date = next_elem_date |