about summary refs log tree commit diff
path: root/users
diff options
context:
space:
mode:
Diffstat (limited to 'users')
-rw-r--r--users/flokli/nixos/archeology-ec2/configuration.nix18
-rw-r--r--users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py62
2 files changed, 80 insertions, 0 deletions
diff --git a/users/flokli/nixos/archeology-ec2/configuration.nix b/users/flokli/nixos/archeology-ec2/configuration.nix
index af10771adc44..f0fc0c5d095c 100644
--- a/users/flokli/nixos/archeology-ec2/configuration.nix
+++ b/users/flokli/nixos/archeology-ec2/configuration.nix
@@ -6,6 +6,24 @@
     ../profiles/archeology.nix
   ];
 
+  systemd.timers.parse-bucket-logs = {
+    wantedBy = [ "multi-user.target" ];
+    timerConfig.OnCalendar = "*-*-* 03:00:00 UTC";
+  };
+
+  systemd.services.parse-bucket-logs = {
+    path = [ depot.users.flokli.archeology.parse-bucket-logs ];
+    serviceConfig = {
+      Type = "oneshot";
+      ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously"
+        {
+          libraries = [ pkgs.python3Packages.boto3 ];
+        } ./parse-bucket-logs-continuously.py);
+      DynamicUser = "yes";
+      StateDirectory = "parse-bucket-logs";
+    };
+  };
+
   environment.systemPackages = [
     depot.users.flokli.archeology.parse-bucket-logs
   ];
diff --git a/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py
new file mode 100644
index 000000000000..f6ec8fb77cef
--- /dev/null
+++ b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py
@@ -0,0 +1,62 @@
+import boto3
+import datetime
+import os
+import re
+import subprocess
+import tempfile
+
+s3 = boto3.resource('s3')
+bucket_name = "nix-archeologist"
+prefix = "nix-cache-bucket-logs/"
+
+bucket = s3.Bucket(bucket_name)
+
+key_pattern = re.compile(r'.*\/(?P<y>\d{4})-(?P<m>\d{2})-(?P<d>\d{2})\.parquet$')  # noqa: E501
+
+# get a listing (which is sorted), grab the most recent key
+last_elem = list(
+    o for o in bucket.objects.filter(Prefix=prefix)
+    if key_pattern.match(o.key)
+).pop()
+
+# extract the date of that key.
+m = key_pattern.search(last_elem.key)
+last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d")))  # noqa: E501
+
+# get the current date (UTC)
+now = datetime.datetime.now(tz=datetime.UTC)
+now_date = datetime.date(now.year, now.month, now.day)
+
+while True:
+    # Calculate what date would be processed next.
+    next_elem_date = last_elem_date + datetime.timedelta(days=1)
+
+    # If that's today, we don't want to process it.
+    if next_elem_date == now_date:
+        print("Caught up, would process data from today.")
+        break
+
+    # If we'd be processing data from yesterday, but it's right after midnight,
+    # also don't process - data might still be flushed.
+    if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0:  # noqa: E501
+        print("Not processing data from previous day right after midnight")
+        break
+
+    src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*"  # noqa: E501
+
+    # Invoke parse-bucket-logs script inside a tempdir and upload on success.
+    with tempfile.TemporaryDirectory() as td:
+        work_file_name = os.path.join(td, "output.parquet")
+        args = ["archeology-parse-bucket-logs", src, work_file_name]
+        subprocess.run(
+            args,
+            check=True  # throw exception if nonzero exit code
+        )
+
+        dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet"
+
+        # Upload the file
+        print(f"uploading to s3://{bucket_name}{dest_key}")
+        bucket.upload_file(work_file_name, dest_key)
+
+    last_elem_date = next_elem_date