apiVersion: torque.dev/v1
kind: Stack
name: firecracker-fraud-platform
cli:
  inferDeps: false
runner:
  concurrency: 1
nodes:
  - name: fc-k8s-bootstrap
    kind: host.command.run
    host:
      transport: ssh
      targetEnv: TORQUE_LAB_SSH
      timeout: 90m
      command: |
        set -euo pipefail
        RUN_ROOT="/var/lib/torque-firecracker-k8s/fraud-platform"
        NODE_COUNT="6"
        SUBNET_OCTET="250"
        BRIDGE_NAME="tqfcfraud"
        TAP_PREFIX="tqfrd"
        RUN_ID="fraud-platform"
        mkdir -p "${RUN_ROOT}"
        cat >"${RUN_ROOT}/fraud-k3s-lab.sh" <<'REMOTE'
        #!/usr/bin/env bash
        set -euo pipefail

        mode="${1:-apply}"
        RUN_ROOT="${RUN_ROOT:-/var/lib/torque-firecracker-k8s/fraud-platform}"
        NODE_COUNT="${NODE_COUNT:-6}"
        SUBNET_OCTET="${SUBNET_OCTET:-250}"
        BRIDGE_NAME="${BRIDGE_NAME:-tqfcfraud}"
        TAP_PREFIX="${TAP_PREFIX:-tqfrd}"
        RUN_ID="${RUN_ID:-fraud-platform}"
        BASE_ROOTFS="${BASE_ROOTFS:-/opt/firecracker-sandbox-lab/rootfs.ext4}"
        KERNEL="${KERNEL:-/opt/kata/share/kata-containers/vmlinux-6.18.28-194}"
        K3S_BIN="${K3S_BIN:-/usr/local/bin/k3s}"
        FIRECRACKER="${FIRECRACKER:-/usr/local/bin/firecracker}"
        LAB_KEY="${LAB_KEY:-/opt/firecracker-sandbox-lab/lab_ssh_key}"
        CACHE_ROOT="${CACHE_ROOT:-/var/lib/torque-firecracker-k8s/cache}"
        ROOTFS_SIZE="${ROOTFS_SIZE:-16G}"
        NET_PREFIX="172.31.${SUBNET_OCTET}"
        GATEWAY="${NET_PREFIX}.1"
        SERVER_IP="${NET_PREFIX}.10"
        CIDR="${NET_PREFIX}.0/24"
        TOKEN_FILE="${RUN_ROOT}/cluster-token"
        PACKAGES="iptables conntrack ipset ethtool socat ca-certificates curl wget tar gzip"
        SSH_OPTS=(-n -i "${LAB_KEY}" -o BatchMode=yes -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=3)
        PUBLIC_RULES=(
          "30080:${SERVER_IP}:30080"
          "3301:${SERVER_IP}:32301"
          "2746:${SERVER_IP}:32746"
          "8081:${SERVER_IP}:32081"
          "8080:${SERVER_IP}:32080"
          "8082:${SERVER_IP}:32082"
          "8265:${SERVER_IP}:32665"
          "10001:${SERVER_IP}:32001"
        )

        node_ip() { printf '%s.%d' "${NET_PREFIX}" "$((10 + $1))"; }
        node_name() { printf 'fc-%02d' "$1"; }
        node_mem() {
          case "$1" in
            0) echo 3072 ;;
            1) echo 3072 ;;
            *) echo 2048 ;;
          esac
        }
        public_ip() {
          ip -4 route get 1.1.1.1 | awk '{for (i=1; i<=NF; i++) if ($i=="src") {print $(i+1); exit}}'
        }

        remove_public_access() {
          local pub rule public_port target_ip target_port
          pub="$(public_ip)"
          for rule in "${PUBLIC_RULES[@]}"; do
            IFS=: read -r public_port target_ip target_port <<<"${rule}"
            iptables -t nat -D PREROUTING -d "${pub}/32" -p tcp -m tcp --dport "${public_port}" -j DNAT --to-destination "${target_ip}:${target_port}" 2>/dev/null || true
            iptables -D FORWARD -p tcp -d "${target_ip}/32" -m tcp --dport "${target_port}" -j ACCEPT 2>/dev/null || true
          done
        }

        apply_public_access() {
          local pub rule public_port target_ip target_port
          pub="$(public_ip)"
          remove_public_access
          for rule in "${PUBLIC_RULES[@]}"; do
            IFS=: read -r public_port target_ip target_port <<<"${rule}"
            iptables -t nat -A PREROUTING -d "${pub}/32" -p tcp -m tcp --dport "${public_port}" -j DNAT --to-destination "${target_ip}:${target_port}"
            iptables -A FORWARD -p tcp -d "${target_ip}/32" -m tcp --dport "${target_port}" -j ACCEPT
            echo "public ${public_port} -> ${target_ip}:${target_port}"
          done
        }

        cleanup_run() {
          local remove_root="${1:-0}"
          set +e
          remove_public_access
          if [[ -d "${RUN_ROOT}/vms" ]]; then
            for pid_file in "${RUN_ROOT}"/vms/*/pid; do
              [[ -f "${pid_file}" ]] && kill "$(cat "${pid_file}")" 2>/dev/null
            done
            sleep 1
            for pid_file in "${RUN_ROOT}"/vms/*/pid; do
              [[ -f "${pid_file}" ]] && kill -9 "$(cat "${pid_file}")" 2>/dev/null
            done
          fi
          for i in $(seq 0 "$((NODE_COUNT - 1))"); do
            ip link del "${TAP_PREFIX}${i}" 2>/dev/null
          done
          ip link set "${BRIDGE_NAME}" down 2>/dev/null
          ip link del "${BRIDGE_NAME}" type bridge 2>/dev/null
          iptables -t nat -D POSTROUTING -s "${CIDR}" ! -o "${BRIDGE_NAME}" -j MASQUERADE 2>/dev/null
          if [[ "${remove_root}" == "1" ]]; then
            rm -rf "${RUN_ROOT}"
          else
            rm -rf "${RUN_ROOT}/vms" "${TOKEN_FILE}" "${RUN_ROOT}/receipt.json" "${RUN_ROOT}/nodes.txt" "${RUN_ROOT}/pods.txt" "${RUN_ROOT}/kubeconfig.yaml" "${RUN_ROOT}/server-journal.txt"
            mkdir -p "${RUN_ROOT}"
          fi
          set -e
        }

        require_cmd() {
          command -v "$1" >/dev/null 2>&1 || { echo "missing required command: $1" >&2; exit 2; }
        }

        prepare_base_image() {
          local cache_key prepared tmp mnt
          mkdir -p "${CACHE_ROOT}"
          cache_key="$(
            {
              sha256sum "${BASE_ROOTFS}" "${K3S_BIN}" "${KERNEL}"
              printf 'packages=%s\n' "${PACKAGES}"
              printf 'rootfs-size=%s\n' "${ROOTFS_SIZE}"
              printf 'k3s-dns=yes\n'
            } | sha256sum | awk '{print substr($1,1,16)}'
          )"
          prepared="${CACHE_ROOT}/prepared-fraud-${cache_key}.ext4"
          if [[ -s "${prepared}" ]]; then
            echo "${prepared}"
            return
          fi
          tmp="${prepared}.tmp"
          mnt="${CACHE_ROOT}/mnt-fraud-${cache_key}"
          rm -f "${tmp}"
          cp --reflink=auto "${BASE_ROOTFS}" "${tmp}" 2>/dev/null || cp "${BASE_ROOTFS}" "${tmp}"
          set +e
          e2fsck -fy "${tmp}" >/tmp/torque-fraud-e2fsck-${cache_key}.log 2>&1
          local e=$?
          set -e
          [[ "${e}" -le 1 ]] || { cat /tmp/torque-fraud-e2fsck-${cache_key}.log >&2; exit "${e}"; }
          truncate -s "${ROOTFS_SIZE}" "${tmp}"
          resize2fs "${tmp}" >/tmp/torque-fraud-resize-${cache_key}.log 2>&1
          mkdir -p "${mnt}"
          mount -o loop "${tmp}" "${mnt}"
          cleanup_mounts() {
            set +e
            mountpoint -q "${mnt}/proc" && umount "${mnt}/proc"
            mountpoint -q "${mnt}/sys" && umount "${mnt}/sys"
            mountpoint -q "${mnt}/dev" && umount "${mnt}/dev"
            mountpoint -q "${mnt}/run" && umount "${mnt}/run"
            mountpoint -q "${mnt}" && umount "${mnt}"
          }
          trap cleanup_mounts RETURN
          rm -f "${mnt}/etc/resolv.conf"
          printf 'nameserver 1.1.1.1\nnameserver 8.8.8.8\n' >"${mnt}/etc/resolv.conf"
          mount -t proc proc "${mnt}/proc"
          mount -t sysfs sysfs "${mnt}/sys"
          mount --bind /dev "${mnt}/dev"
          mount --bind /run "${mnt}/run"
          chroot "${mnt}" apt-get update >/tmp/torque-fraud-apt-update-${cache_key}.log 2>&1
          chroot "${mnt}" env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ${PACKAGES} >/tmp/torque-fraud-apt-install-${cache_key}.log 2>&1
          install -m 0755 "${K3S_BIN}" "${mnt}/usr/local/bin/k3s"
          chroot "${mnt}" update-alternatives --set iptables /usr/sbin/iptables-legacy >/dev/null 2>&1 || true
          chroot "${mnt}" update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy >/dev/null 2>&1 || true
          chroot "${mnt}" /bin/bash -lc "apt-get clean && rm -rf /var/lib/apt/lists/*"
          cleanup_mounts
          trap - RETURN
          mv "${tmp}" "${prepared}"
          echo "${prepared}"
        }

        write_hosts_file() {
          cat <<EOF
        127.0.0.1 localhost
        127.0.1.1 $(node_name "$1")
        ${NET_PREFIX}.10 fc-00 control-plane
        ${NET_PREFIX}.11 fc-01 observability
        ${NET_PREFIX}.12 fc-02 events
        ${NET_PREFIX}.13 fc-03 processing
        ${NET_PREFIX}.14 fc-04 mlbatch
        ${NET_PREFIX}.15 fc-05 analytics
        EOF
        }

        write_service() {
          local role="$1"
          local ip="$2"
          if [[ "${role}" == "server" ]]; then
            cat <<EOF
        [Unit]
        Description=Lightweight Kubernetes
        Wants=network-online.target
        After=network-online.target
        [Service]
        Type=simple
        Environment=K3S_TOKEN=${TOKEN}
        ExecStart=/usr/local/bin/k3s server --cluster-init --node-ip ${ip} --advertise-address ${ip} --bind-address 0.0.0.0 --tls-san ${ip} --tls-san 127.0.0.1 --cluster-cidr 10.250.0.0/16 --service-cidr 10.251.0.0/16 --cluster-dns 10.251.0.10 --flannel-iface eth0 --flannel-backend host-gw --write-kubeconfig-mode 0644 --disable traefik --disable servicelb --disable metrics-server --disable-cloud-controller --disable-network-policy
        KillMode=process
        Delegate=yes
        LimitNOFILE=1048576
        LimitNPROC=infinity
        LimitCORE=infinity
        TasksMax=infinity
        Restart=always
        RestartSec=5s
        TimeoutStartSec=0
        [Install]
        WantedBy=multi-user.target
        EOF
          else
            cat <<EOF
        [Unit]
        Description=Lightweight Kubernetes Agent
        Wants=network-online.target
        After=network-online.target
        [Service]
        Type=simple
        Environment=K3S_TOKEN=${TOKEN}
        ExecStart=/usr/local/bin/k3s agent --server https://${SERVER_IP}:6443 --node-ip ${ip} --flannel-iface eth0
        KillMode=process
        Delegate=yes
        LimitNOFILE=1048576
        LimitNPROC=infinity
        LimitCORE=infinity
        TasksMax=infinity
        Restart=always
        RestartSec=5s
        TimeoutStartSec=0
        [Install]
        WantedBy=multi-user.target
        EOF
          fi
        }

        apply_vms() {
          for cmd in cp curl e2fsck mount resize2fs sha256sum ssh sysctl truncate umount openssl; do require_cmd "${cmd}"; done
          for path in "${BASE_ROOTFS}" "${KERNEL}" "${K3S_BIN}" "${FIRECRACKER}" "${LAB_KEY}"; do
            [[ -e "${path}" ]] || { echo "missing ${path}" >&2; exit 2; }
          done
          mkdir -p "${RUN_ROOT}/vms"
          local prepared
          prepared="$(prepare_base_image)"

          if [[ -s "${RUN_ROOT}/receipt.json" && -d "${RUN_ROOT}/vms" ]]; then
            local live_count=0 ready_count=0
            for pid_file in "${RUN_ROOT}"/vms/*/pid; do
              [[ -f "${pid_file}" ]] && kill -0 "$(cat "${pid_file}")" 2>/dev/null && live_count="$((live_count + 1))"
            done
            nodes_text="$(ssh "${SSH_OPTS[@]}" "root@${SERVER_IP}" 'timeout 20s /usr/local/bin/k3s kubectl get nodes -o wide --no-headers 2>/dev/null' || true)"
            ready_count="$(printf '%s\n' "${nodes_text}" | awk '$2=="Ready"{c++} END{print c+0}')"
            if [[ "${live_count}" -ge "${NODE_COUNT}" && "${ready_count}" -ge "${NODE_COUNT}" ]]; then
              ssh "${SSH_OPTS[@]}" "root@${SERVER_IP}" 'cat /etc/rancher/k3s/k3s.yaml' |
                sed "s#https://0.0.0.0:6443#https://${SERVER_IP}:6443#g" >"${RUN_ROOT}/kubeconfig.yaml"
              printf '%s\n' "${nodes_text}" >"${RUN_ROOT}/nodes.txt"
              cat >"${RUN_ROOT}/receipt.json" <<EOF
        {"apiVersion":"torque.dev/firecracker-k8s/v1","kind":"FraudPlatformK3sReceipt","status":"succeeded","runId":"${RUN_ID}","nodeCount":${NODE_COUNT},"readyCount":${ready_count},"serverIP":"${SERVER_IP}","subnet":"${CIDR}","bridge":"${BRIDGE_NAME}","idempotentReuse":true}
        EOF
              echo "fraud-platform-already-ready nodes=${ready_count} live=${live_count}"
              cat "${RUN_ROOT}/nodes.txt"
              return
            fi
          fi

          cleanup_run
          mkdir -p "${RUN_ROOT}/vms"
          openssl rand -hex 24 >"${TOKEN_FILE}"
          chmod 0600 "${TOKEN_FILE}"
          TOKEN="$(cat "${TOKEN_FILE}")"

          ip link add name "${BRIDGE_NAME}" type bridge
          ip addr add "${GATEWAY}/24" dev "${BRIDGE_NAME}"
          ip link set "${BRIDGE_NAME}" up
          sysctl -w net.ipv4.ip_forward=1 >/dev/null
          iptables -t nat -C POSTROUTING -s "${CIDR}" ! -o "${BRIDGE_NAME}" -j MASQUERADE 2>/dev/null ||
            iptables -t nat -A POSTROUTING -s "${CIDR}" ! -o "${BRIDGE_NAME}" -j MASQUERADE

          for i in $(seq 0 "$((NODE_COUNT - 1))"); do
            local vm ip name tap mac mnt role mem service_name
            vm="${RUN_ROOT}/vms/node${i}"
            ip="$(node_ip "${i}")"
            name="$(node_name "${i}")"
            tap="${TAP_PREFIX}${i}"
            role="agent"
            service_name="k3s-agent.service"
            [[ "${i}" == "0" ]] && role="server" && service_name="k3s.service"
            mem="$(node_mem "${i}")"
            mac="$(printf '06:00:00:%02x:02:%02x' "${SUBNET_OCTET}" "$((10 + i))")"
            mkdir -p "${vm}"
            cp --reflink=auto "${prepared}" "${vm}/rootfs.ext4" 2>/dev/null || cp "${prepared}" "${vm}/rootfs.ext4"
            e2fsck -fy "${vm}/rootfs.ext4" >/dev/null 2>&1 || true
            mnt="${vm}/mnt"
            mkdir -p "${mnt}"
            mount -o loop "${vm}/rootfs.ext4" "${mnt}"
            printf '%s\n' "${name}" >"${mnt}/etc/hostname"
            write_hosts_file "${i}" >"${mnt}/etc/hosts"
            cat >"${mnt}/etc/network/interfaces" <<EOF
        auto lo
        iface lo inet loopback

        auto eth0
        iface eth0 inet static
            address ${ip}
            netmask 255.255.255.0
            gateway ${GATEWAY}
        EOF
            rm -f "${mnt}/etc/resolv.conf"
            printf 'nameserver 1.1.1.1\nnameserver 8.8.8.8\n' >"${mnt}/etc/resolv.conf"
            rm -f "${mnt}/etc/machine-id" "${mnt}/var/lib/dbus/machine-id" 2>/dev/null || true
            touch "${mnt}/etc/machine-id"
            mkdir -p "${mnt}/etc/systemd/system/multi-user.target.wants"
            ln -sf /lib/systemd/system/ssh.service "${mnt}/etc/systemd/system/multi-user.target.wants/ssh.service"
            write_service "${role}" "${ip}" >"${mnt}/etc/systemd/system/${service_name}"
            ln -sf "/etc/systemd/system/${service_name}" "${mnt}/etc/systemd/system/multi-user.target.wants/${service_name}"
            umount "${mnt}"
            ip tuntap add dev "${tap}" mode tap
            ip link set "${tap}" master "${BRIDGE_NAME}"
            ip link set "${tap}" up
            cat >"${vm}/vm.json" <<EOF
        {"boot-source":{"kernel_image_path":"${KERNEL}","boot_args":"console=ttyS0 reboot=k panic=1 pci=off root=/dev/vda rw random.trust_cpu=on systemd.unified_cgroup_hierarchy=1 systemd.mask=serial-getty@ttyS0.service systemd.mask=systemd-random-seed.service"},"drives":[{"drive_id":"rootfs","path_on_host":"${vm}/rootfs.ext4","is_root_device":true,"is_read_only":false}],"machine-config":{"vcpu_count":2,"mem_size_mib":${mem}},"network-interfaces":[{"iface_id":"eth0","host_dev_name":"${tap}","guest_mac":"${mac}"}],"logger":{"log_path":"${vm}/firecracker.log","level":"Info","show_level":true,"show_log_origin":true}}
        EOF
            "${FIRECRACKER}" --api-sock "${vm}/fc.sock" --config-file "${vm}/vm.json" >"${vm}/console.log" 2>&1 &
            echo $! >"${vm}/pid"
            echo "started ${name} ${ip} mem=${mem}"
          done

          local ssh_count=0
          for i in $(seq 0 "$((NODE_COUNT - 1))"); do
            for _ in $(seq 1 180); do
              if ssh "${SSH_OPTS[@]}" "root@$(node_ip "${i}")" true >/dev/null 2>&1; then
                ssh_count="$((ssh_count + 1))"
                break
              fi
              sleep 2
            done
          done
          [[ "${ssh_count}" -eq "${NODE_COUNT}" ]] || { echo "only ${ssh_count}/${NODE_COUNT} VMs reachable" >&2; exit 1; }

          local ready_count=0 nodes_text=""
          for attempt in $(seq 1 180); do
            nodes_text="$(ssh "${SSH_OPTS[@]}" "root@${SERVER_IP}" 'timeout 20s /usr/local/bin/k3s kubectl get nodes -o wide --no-headers 2>/dev/null' || true)"
            ready_count="$(printf '%s\n' "${nodes_text}" | awk '$2=="Ready"{c++} END{print c+0}')"
            [[ "${ready_count}" -ge "${NODE_COUNT}" ]] && break
            if (( attempt % 10 == 0 )); then
              echo "waiting-k3s nodes=${ready_count}/${NODE_COUNT}" >&2
              printf '%s\n' "${nodes_text}" >&2
            fi
            sleep 3
          done
          printf '%s\n' "${nodes_text}" >"${RUN_ROOT}/nodes.txt"
          ssh "${SSH_OPTS[@]}" "root@${SERVER_IP}" 'cat /etc/rancher/k3s/k3s.yaml' |
            sed "s#https://0.0.0.0:6443#https://${SERVER_IP}:6443#g" >"${RUN_ROOT}/kubeconfig.yaml"
          ssh "${SSH_OPTS[@]}" "root@${SERVER_IP}" 'timeout 20s /usr/local/bin/k3s kubectl get pods -A -o wide || true' >"${RUN_ROOT}/pods.txt" 2>&1 || true
          cat >"${RUN_ROOT}/receipt.json" <<EOF
        {"apiVersion":"torque.dev/firecracker-k8s/v1","kind":"FraudPlatformK3sReceipt","status":"succeeded","runId":"${RUN_ID}","nodeCount":${NODE_COUNT},"readyCount":${ready_count},"serverIP":"${SERVER_IP}","subnet":"${CIDR}","bridge":"${BRIDGE_NAME}"}
        EOF
          [[ "${ready_count}" -ge "${NODE_COUNT}" ]] || { echo "only ${ready_count}/${NODE_COUNT} k3s nodes ready" >&2; exit 1; }
          cat "${RUN_ROOT}/nodes.txt"
        }

        case "${mode}" in
          apply) apply_vms ;;
          public-apply) apply_public_access ;;
          public-delete) remove_public_access ;;
          delete|cleanup) cleanup_run 1 ;;
          *) echo "unknown mode: ${mode}" >&2; exit 2 ;;
        esac
        REMOTE
        chmod +x "${RUN_ROOT}/fraud-k3s-lab.sh"
        RUN_ROOT="${RUN_ROOT}" NODE_COUNT="${NODE_COUNT}" SUBNET_OCTET="${SUBNET_OCTET}" BRIDGE_NAME="${BRIDGE_NAME}" TAP_PREFIX="${TAP_PREFIX}" RUN_ID="${RUN_ID}" \
          "${RUN_ROOT}/fraud-k3s-lab.sh" apply
      deleteCommand: |
        set +e
        RUN_ROOT="/var/lib/torque-firecracker-k8s/fraud-platform"
        if [ -x "${RUN_ROOT}/fraud-k3s-lab.sh" ]; then
          RUN_ROOT="${RUN_ROOT}" "${RUN_ROOT}/fraud-k3s-lab.sh" delete
        else
          for p in "${RUN_ROOT}"/vms/*/pid; do [ -f "$p" ] && kill "$(cat "$p")" 2>/dev/null; done
          for i in $(seq 0 5); do ip link del "tqfrd${i}" 2>/dev/null; done
          ip link set tqfcfraud down 2>/dev/null
          ip link del tqfcfraud type bridge 2>/dev/null
          iptables -t nat -D POSTROUTING -s "172.31.250.0/24" ! -o tqfcfraud -j MASQUERADE 2>/dev/null
          rm -rf "${RUN_ROOT}"
        fi

  - name: fc-k8s-tunnel
    kind: host.command.run
    needs: [fc-k8s-bootstrap]
    host:
      transport: local
      timeout: 5m
      command: |
        set -euo pipefail
        LAB_TARGET="${TORQUE_LAB_SSH:-ssh://root@${TORQUE_LAB_PUBLIC_IP:?set TORQUE_LAB_PUBLIC_IP or TORQUE_LAB_SSH}}"
        LAB_TARGET="${LAB_TARGET#ssh://}"
        RUN_ROOT="/var/lib/torque-firecracker-k8s/fraud-platform"
        SERVER_IP="172.31.250.10"
        API_PORT="${TORQUE_FRAUD_API_PORT:-16450}"
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        CONTROL_PATH="${TORQUE_FRAUD_SSH_CONTROL:-/tmp/torque-fraud-platform.ctl}"
        ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new "${LAB_TARGET}" "cat '${RUN_ROOT}/kubeconfig.yaml'" |
          sed "s#https://${SERVER_IP}:6443#https://127.0.0.1:${API_PORT}#g" >"${KUBECONFIG_PATH}"
        ssh -S "${CONTROL_PATH}" -O exit "${LAB_TARGET}" >/dev/null 2>&1 || true
        rm -f "${CONTROL_PATH}"
        ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -o ExitOnForwardFailure=yes \
          -M -S "${CONTROL_PATH}" -fN -L "127.0.0.1:${API_PORT}:${SERVER_IP}:6443" "${LAB_TARGET}"
        for i in $(seq 1 60); do
          if kubectl --kubeconfig "${KUBECONFIG_PATH}" get nodes >/dev/null 2>&1; then
            kubectl --kubeconfig "${KUBECONFIG_PATH}" get nodes -o wide
            exit 0
          fi
          sleep 2
        done
        echo "tunnel did not expose Kubernetes API" >&2
        exit 1
      deleteCommand: |
        set +e
        LAB_TARGET="${TORQUE_LAB_SSH:-ssh://root@${TORQUE_LAB_PUBLIC_IP:?set TORQUE_LAB_PUBLIC_IP or TORQUE_LAB_SSH}}"
        LAB_TARGET="${LAB_TARGET#ssh://}"
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        CONTROL_PATH="${TORQUE_FRAUD_SSH_CONTROL:-/tmp/torque-fraud-platform.ctl}"
        ssh -S "${CONTROL_PATH}" -O exit "${LAB_TARGET}" >/dev/null 2>&1
        rm -f "${CONTROL_PATH}" "${KUBECONFIG_PATH}"

  - name: aws-s3-bootstrap
    kind: host.command.run
    needs: [fc-k8s-tunnel]
    host:
      transport: local
      timeout: 10m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        region="${AWS_REGION:-$(aws configure get region)}"
        region="${region:-us-east-1}"
        account="$(aws sts get-caller-identity --query Account --output text)"
        bucket="${TORQUE_FRAUD_S3_BUCKET:-torque-fraud-demo-${account}-${region}}"
        if ! aws s3api head-bucket --bucket "${bucket}" >/dev/null 2>&1; then
          if [[ "${region}" == "us-east-1" ]]; then
            aws s3api create-bucket --bucket "${bucket}" --region "${region}" >/dev/null
          else
            aws s3api create-bucket --bucket "${bucket}" --region "${region}" --create-bucket-configuration LocationConstraint="${region}" >/dev/null
          fi
        fi
        aws s3api put-bucket-versioning --bucket "${bucket}" --versioning-configuration Status=Enabled >/dev/null || true
        tmp="$(mktemp)"
        trap 'rm -f "${tmp}"' EXIT
        aws configure export-credentials --format env-no-export >"${tmp}"
        set -a
        # shellcheck disable=SC1090
        . "${tmp}"
        set +a
        for ns in apps data stream ml argo observability; do
          kubectl --kubeconfig "${KUBECONFIG_PATH}" create namespace "${ns}" --dry-run=client -o yaml |
            kubectl --kubeconfig "${KUBECONFIG_PATH}" apply -f -
        done
        create_aws_secret() {
          ns="$1"
          args=(
            --from-literal="S3_BUCKET=${bucket}"
            --from-literal="AWS_DEFAULT_REGION=${region}"
            --from-literal="AWS_REGION=${region}"
            --from-literal="AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}"
            --from-literal="AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}"
          )
          if [[ -n "${AWS_SESSION_TOKEN:-}" ]]; then
            args+=(--from-literal="AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}")
          fi
          kubectl --kubeconfig "${KUBECONFIG_PATH}" -n "${ns}" create secret generic aws-s3 "${args[@]}" --dry-run=client -o yaml |
            kubectl --kubeconfig "${KUBECONFIG_PATH}" apply -f -
        }
        for ns in apps argo ml data; do
          create_aws_secret "${ns}"
        done
        printf 'S3_BUCKET=%s\nAWS_REGION=%s\n' "${bucket}" "${region}" > /tmp/torque-fraud-s3.env
        echo "s3_bucket=${bucket}"
      deleteCommand: "true"

  - name: platform-install
    kind: host.command.run
    needs: [aws-s3-bootstrap]
    host:
      transport: local
      timeout: 45m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        k() { kubectl --kubeconfig "${KUBECONFIG_PATH}" "$@"; }
        k label node fc-00 fraud.torque.dev/workload=control --overwrite
        k label node fc-01 fraud.torque.dev/workload=observability --overwrite
        k label node fc-02 fraud.torque.dev/workload=events --overwrite
        k label node fc-03 fraud.torque.dev/workload=processing --overwrite
        k label node fc-04 fraud.torque.dev/workload=mlbatch --overwrite
        k label node fc-05 fraud.torque.dev/workload=analytics --overwrite

        k apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.6.19/install.yaml
        k -n argo apply -f - <<'YAML'
        apiVersion: rbac.authorization.k8s.io/v1
        kind: Role
        metadata:
          name: argo-workflow-taskresults
        rules:
          - apiGroups: ["argoproj.io"]
            resources: ["workflowtaskresults"]
            verbs: ["create", "get", "list", "patch", "update", "watch"]
        ---
        apiVersion: rbac.authorization.k8s.io/v1
        kind: RoleBinding
        metadata:
          name: argo-workflow-taskresults
        roleRef:
          apiGroup: rbac.authorization.k8s.io
          kind: Role
          name: argo-workflow-taskresults
        subjects:
          - kind: ServiceAccount
            name: argo
            namespace: argo
        YAML
        k -n argo patch deployment argo-server --type='json' -p='[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--auth-mode=server"}]' >/dev/null 2>&1 || true
        k -n argo patch service argo-server -p '{"spec":{"type":"NodePort","ports":[{"name":"web","port":2746,"targetPort":2746,"nodePort":32746}]}}'

        tmp_values="$(mktemp)"
        trap 'rm -f "${tmp_values}"' EXIT
        cat >"${tmp_values}" <<'YAML'
        global:
          storageClass: local-path
          clusterName: firecracker-fraud-platform
        clickhouse:
          password: torque-clickhouse
          persistence:
            enabled: false
          nodeSelector:
            fraud.torque.dev/workload: observability
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 1000m
              memory: 900Mi
          zookeeper:
            replicaCount: 1
            heapSize: 256
            persistence:
              enabled: false
            nodeSelector:
              fraud.torque.dev/workload: observability
            resources:
              requests:
                cpu: 50m
                memory: 128Mi
              limits:
                cpu: 500m
                memory: 384Mi
        signoz:
          persistence:
            enabled: false
          service:
            type: NodePort
            nodePort: 32301
          nodeSelector:
            fraud.torque.dev/workload: observability
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 750m
              memory: 768Mi
        otelCollector:
          nodeSelector:
            fraud.torque.dev/workload: observability
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 512Mi
        telemetryStoreMigrator:
          nodeSelector:
            fraud.torque.dev/workload: observability
        YAML
        helm repo add signoz https://charts.signoz.io >/dev/null 2>&1 || true
        helm repo update signoz >/dev/null
        if ! helm upgrade --install signoz signoz/signoz -n observability --kubeconfig "${KUBECONFIG_PATH}" -f "${tmp_values}" --timeout 25m; then
          helm status signoz -n observability --kubeconfig "${KUBECONFIG_PATH}" | grep -q 'STATUS: deployed'
        fi

        for attempt in $(seq 1 180); do
          not_ready="$(k get pods -n observability --no-headers 2>/dev/null | awk '{split($2,ready,"/")} $3!="Running" && $3!="Completed"{print} $3=="Running" && ready[1]!=ready[2]{print}' || true)"
          if [[ -z "${not_ready}" ]]; then
            break
          fi
          if (( attempt % 12 == 0 )); then
            echo "waiting-observability attempt=${attempt}" >&2
            k get pods -n observability -o wide >&2 || true
          fi
          sleep 5
        done
        k get pods -n observability -o wide
      deleteCommand: |
        set +e
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        helm uninstall signoz -n observability --kubeconfig "${KUBECONFIG_PATH}" >/dev/null 2>&1
        kubectl --kubeconfig "${KUBECONFIG_PATH}" delete namespace argo observability --ignore-not-found --timeout=120s

  - name: workloads-install
    kind: host.command.run
    needs: [platform-install]
    host:
      transport: local
      timeout: 45m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        ROOT="testdata/stack/e2e/21-firecracker-fraud-platform"
        k() { kubectl --kubeconfig "${KUBECONFIG_PATH}" "$@"; }
        secret_value() {
          k -n data get secret aws-s3 -o "jsonpath={.data.${1}}" |
            python3 -c 'import base64, sys; sys.stdout.write(base64.b64decode(sys.stdin.read()).decode())'
        }
        secret_value_or_empty() {
          k -n data get secret aws-s3 -o "jsonpath={.data.${1}}" 2>/dev/null |
            python3 -c 'import base64, sys; data=sys.stdin.read(); sys.stdout.write(base64.b64decode(data).decode() if data else "")'
        }

        iceberg_bucket="$(secret_value S3_BUCKET)"
        iceberg_region="$(secret_value_or_empty AWS_REGION)"
        if [[ -z "${iceberg_region}" ]]; then
          iceberg_region="$(secret_value AWS_DEFAULT_REGION)"
        fi
        tmp_iceberg_catalog="$(mktemp)"
        cat >"${tmp_iceberg_catalog}" <<EOF
        connector.name=iceberg
        iceberg.catalog.type=rest
        iceberg.rest-catalog.uri=http://iceberg-rest.data.svc.cluster.local:8181
        iceberg.rest-catalog.warehouse=s3://${iceberg_bucket}/iceberg/warehouse
        iceberg.file-format=PARQUET
        fs.s3.enabled=true
        s3.region=${iceberg_region}
        EOF
        k -n data create configmap trino-iceberg-catalog --from-file=iceberg.properties="${tmp_iceberg_catalog}" --dry-run=client -o yaml | k apply -f -
        rm -f "${tmp_iceberg_catalog}"

        k -n apps create configmap payments-api-code --from-file=payment_api.py="${ROOT}/app/payment_api.py" --dry-run=client -o yaml | k apply -f -
        k -n apps create configmap payments-generator-code --from-file=generator.py="${ROOT}/app/generator.py" --dry-run=client -o yaml | k apply -f -
        k -n ml create configmap ray-model-code --from-file=ray_model.py="${ROOT}/app/ray_model.py" --dry-run=client -o yaml | k apply -f -
        k -n argo create configmap spark-batch-code --from-file=spark_batch.py="${ROOT}/app/spark_batch.py" --dry-run=client -o yaml | k apply -f -
        k -n data delete job redpanda-topic-init --ignore-not-found --wait=false >/dev/null 2>&1 || true
        k -n data delete job fraud-data-contracts --ignore-not-found --wait=false >/dev/null 2>&1 || true
        k -n ml delete job ray-serve-deploy --ignore-not-found --wait=false >/dev/null 2>&1 || true

        k apply -f - <<'YAML'
        apiVersion: v1
        kind: Service
        metadata:
          name: redpanda
          namespace: data
        spec:
          selector:
            app: redpanda
          ports:
            - name: kafka
              port: 9092
              targetPort: 9092
            - name: admin
              port: 9644
              targetPort: 9644
            - name: registry
              port: 8081
              targetPort: 8081
        ---
        apiVersion: apps/v1
        kind: StatefulSet
        metadata:
          name: redpanda
          namespace: data
        spec:
          serviceName: redpanda
          replicas: 1
          selector:
            matchLabels:
              app: redpanda
          template:
            metadata:
              labels:
                app: redpanda
            spec:
              nodeSelector:
                fraud.torque.dev/workload: events
              containers:
                - name: redpanda
                  image: redpandadata/redpanda:v26.1.9
                  args:
                    - redpanda
                    - start
                    - --smp=1
                    - --memory=768M
                    - --reserve-memory=0M
                    - --overprovisioned
                    - --node-id=0
                    - --check=false
                    - --kafka-addr=PLAINTEXT://0.0.0.0:9092
                    - --advertise-kafka-addr=PLAINTEXT://redpanda.data.svc.cluster.local:9092
                    - --schema-registry-addr=0.0.0.0:8081
                    - --rpc-addr=0.0.0.0:33145
                    - --advertise-rpc-addr=redpanda-0.redpanda.data.svc.cluster.local:33145
                  ports:
                    - containerPort: 9092
                      name: kafka
                    - containerPort: 9644
                      name: admin
                    - containerPort: 8081
                      name: registry
                  resources:
                    requests:
                      cpu: 200m
                      memory: 512Mi
                    limits:
                      cpu: "1"
                      memory: 1000Mi
                  readinessProbe:
                    httpGet:
                      path: /v1/status/ready
                      port: 9644
                    initialDelaySeconds: 20
                    periodSeconds: 5
        ---
        apiVersion: batch/v1
        kind: Job
        metadata:
          name: redpanda-topic-init
          namespace: data
        spec:
          template:
            spec:
              restartPolicy: OnFailure
              containers:
                - name: topic-init
                  image: redpandadata/redpanda:v26.1.9
                  command: ["/bin/bash", "-lc"]
                  args:
                    - |
                      for topic in payments.raw payments.risk payments.decisions payments.raw.replay; do
                        rpk topic create "${topic}" -X brokers=redpanda.data.svc.cluster.local:9092 || true
                      done
        ---
        apiVersion: batch/v1
        kind: Job
        metadata:
          name: fraud-data-contracts
          namespace: data
        spec:
          template:
            spec:
              restartPolicy: OnFailure
              containers:
                - name: register-contracts
                  image: python:3.12-slim
                  command: ["/bin/bash", "-lc"]
                  args:
                    - |
                      python - <<'PY'
                      import json
                      import time
                      import urllib.request

                      base = "http://redpanda.data.svc.cluster.local:8081"
                      headers = {
                          "content-type": "application/vnd.schemaregistry.v1+json",
                          "accept": "application/vnd.schemaregistry.v1+json",
                      }

                      def request(method, path, payload=None):
                          data = None if payload is None else json.dumps(payload).encode()
                          req = urllib.request.Request(base + path, data=data, headers=headers, method=method)
                          with urllib.request.urlopen(req, timeout=5) as resp:
                              body = resp.read().decode()
                              return json.loads(body) if body else {}

                      for _ in range(120):
                          try:
                              print(request("GET", "/schemas/types"), flush=True)
                              break
                          except Exception as exc:
                              last = exc
                              time.sleep(2)
                      else:
                          raise RuntimeError(f"schema registry not ready: {last}")

                      common = {
                          "$schema": "http://json-schema.org/draft-07/schema#",
                          "type": "object",
                          "additionalProperties": True,
                          "required": [
                              "event_id",
                              "event_ts",
                              "user_id",
                              "account_id",
                              "amount",
                              "merchant_id",
                              "merchant_category",
                              "country",
                              "billing_country",
                              "velocity_5m",
                          ],
                          "properties": {
                              "event_id": {"type": "string"},
                              "event_ts": {"type": "string"},
                              "user_id": {"type": "string"},
                              "account_id": {"type": "string"},
                              "amount": {"type": "number"},
                              "currency": {"type": "string"},
                              "merchant_id": {"type": "string"},
                              "merchant_name": {"type": "string"},
                              "merchant_category": {"type": "string"},
                              "country": {"type": "string"},
                              "billing_country": {"type": "string"},
                              "device_type": {"type": "string"},
                              "device_id": {"type": "string"},
                              "velocity_5m": {"type": "integer"},
                              "is_fraud_label": {"type": "integer"},
                          },
                      }
                      risk = json.loads(json.dumps(common))
                      risk["required"] = common["required"] + ["rule_risk", "rule_decision"]
                      risk["properties"].update(
                          {
                              "rule_risk": {"type": "number"},
                              "rule_decision": {"type": "string", "enum": ["approve", "review", "decline"]},
                          }
                      )
                      decisions = json.loads(json.dumps(common))
                      decisions["required"] = common["required"] + ["rule_score", "ml_score", "decision", "decision_ts"]
                      decisions["properties"].update(
                          {
                              "rule_score": {"type": "number"},
                              "ml_score": {"type": "number"},
                              "decision": {"type": "string", "enum": ["approve", "review", "decline"]},
                              "decision_ts": {"type": "string"},
                          }
                      )
                      subjects = {
                          "payments.raw-value": common,
                          "payments.risk-value": risk,
                          "payments.decisions-value": decisions,
                      }
                      for subject, schema in subjects.items():
                          request("PUT", f"/config/{subject}", {"compatibility": "BACKWARD"})
                          response = request(
                              "POST",
                              f"/subjects/{subject}/versions",
                              {
                                  "schemaType": "JSON",
                                  "schema": json.dumps(schema, sort_keys=True),
                                  "metadata": {
                                      "properties": {
                                          "owner": "fraud-platform",
                                          "domain": "payments-risk",
                                          "tier": "lab-contract",
                                      }
                                  },
                              },
                          )
                          print({subject: response}, flush=True)
                      PY
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: iceberg-rest
          namespace: data
        spec:
          selector:
            app: iceberg-rest
          ports:
            - name: http
              port: 8181
              targetPort: 8181
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: iceberg-rest
          namespace: data
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: iceberg-rest
          template:
            metadata:
              labels:
                app: iceberg-rest
            spec:
              nodeSelector:
                fraud.torque.dev/workload: analytics
              containers:
                - name: catalog
                  image: tabulario/iceberg-rest:1.6.0
                  envFrom:
                    - secretRef:
                        name: aws-s3
                  env:
                    - name: CATALOG_WAREHOUSE
                      value: s3://$(S3_BUCKET)/iceberg/warehouse
                    - name: CATALOG_IO__IMPL
                      value: org.apache.iceberg.aws.s3.S3FileIO
                    - name: CATALOG_S3_REGION
                      value: $(AWS_REGION)
                  ports:
                    - containerPort: 8181
                      name: http
                  resources:
                    requests:
                      cpu: 100m
                      memory: 384Mi
                    limits:
                      cpu: "1"
                      memory: 768Mi
                  readinessProbe:
                    httpGet:
                      path: /v1/config
                      port: 8181
                    initialDelaySeconds: 15
                    periodSeconds: 5
        ---
        apiVersion: v1
        kind: ConfigMap
        metadata:
          name: trino-config
          namespace: data
        data:
          config.properties: |
            coordinator=true
            node-scheduler.include-coordinator=true
            http-server.http.port=8080
            discovery.uri=http://localhost:8080
            query.max-memory=384MB
            query.max-memory-per-node=256MB
            memory.heap-headroom-per-node=128MB
          node.properties: |
            node.environment=fraud_lab
            node.id=trino-fraud-lab
            node.data-dir=/tmp/trino
          jvm.config: |
            -server
            -Xmx768M
            -XX:+ExplicitGCInvokesConcurrent
            -XX:+ExitOnOutOfMemoryError
            -Dfile.encoding=UTF-8
            --add-modules=jdk.incubator.vector
          log.properties: |
            io.trino=INFO
          clickhouse.properties: |
            connector.name=clickhouse
            connection-url=jdbc:clickhouse://signoz-clickhouse.observability.svc.cluster.local:8123/
            connection-user=admin
            connection-password=torque-clickhouse
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: trino
          namespace: data
        spec:
          type: NodePort
          selector:
            app: trino
          ports:
            - name: http
              port: 8080
              targetPort: 8080
              nodePort: 32082
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: trino
          namespace: data
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: trino
          template:
            metadata:
              labels:
                app: trino
            spec:
              nodeSelector:
                fraud.torque.dev/workload: analytics
              volumes:
                - name: trino-config
                  configMap:
                    name: trino-config
                - name: trino-clickhouse-catalog
                  configMap:
                    name: trino-config
                    items:
                      - key: clickhouse.properties
                        path: clickhouse.properties
                - name: trino-iceberg-catalog
                  configMap:
                    name: trino-iceberg-catalog
              containers:
                - name: trino
                  image: trinodb/trino:481
                  envFrom:
                    - secretRef:
                        name: aws-s3
                  ports:
                    - containerPort: 8080
                      name: http
                  volumeMounts:
                    - name: trino-config
                      mountPath: /etc/trino/config.properties
                      subPath: config.properties
                    - name: trino-config
                      mountPath: /etc/trino/node.properties
                      subPath: node.properties
                    - name: trino-config
                      mountPath: /etc/trino/jvm.config
                      subPath: jvm.config
                    - name: trino-config
                      mountPath: /etc/trino/log.properties
                      subPath: log.properties
                    - name: trino-clickhouse-catalog
                      mountPath: /etc/trino/catalog/clickhouse.properties
                      subPath: clickhouse.properties
                    - name: trino-iceberg-catalog
                      mountPath: /etc/trino/catalog/iceberg.properties
                      subPath: iceberg.properties
                  resources:
                    requests:
                      cpu: 250m
                      memory: 768Mi
                    limits:
                      cpu: "1"
                      memory: 1200Mi
                  readinessProbe:
                    httpGet:
                      path: /v1/info
                      port: 8080
                    initialDelaySeconds: 30
                    periodSeconds: 5
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: ray-head
          namespace: ml
        spec:
          type: NodePort
          selector:
            app: ray-head
          ports:
            - name: gcs
              port: 6379
              targetPort: 6379
            - name: dashboard
              port: 8265
              targetPort: 8265
              nodePort: 32665
            - name: client
              port: 10001
              targetPort: 10001
              nodePort: 32001
            - name: serve
              port: 8000
              targetPort: 8000
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: ray-head
          namespace: ml
        spec:
          replicas: 1
          strategy:
            type: Recreate
          selector:
            matchLabels:
              app: ray-head
          template:
            metadata:
              labels:
                app: ray-head
            spec:
              nodeSelector:
                fraud.torque.dev/workload: mlbatch
              volumes:
                - name: ray-model-code
                  configMap:
                    name: ray-model-code
              containers:
                - name: ray
                  image: rayproject/ray:2.55.1-py312-cpu
                  command: ["/bin/bash", "-lc"]
                  args:
                    - |
                      ray start --head --node-ip-address=$(hostname -i) --port=6379 --dashboard-host=0.0.0.0 --dashboard-port=8265 --ray-client-server-port=10001 --num-cpus=1 --object-store-memory=100000000 --disable-usage-stats
                      python /opt/app/ray_model.py
                      exec tail -f /dev/null
                  env:
                    - name: RAY_memory_usage_threshold
                      value: "0.99"
                    - name: MODEL_NAME
                      value: ray-serve-logistic-risk
                    - name: MODEL_VERSION
                      value: v1
                  ports:
                    - containerPort: 6379
                    - containerPort: 8265
                    - containerPort: 10001
                    - containerPort: 8000
                  resources:
                    requests:
                      cpu: 300m
                      memory: 900Mi
                    limits:
                      cpu: "1"
                      memory: 1700Mi
                  volumeMounts:
                    - name: ray-model-code
                      mountPath: /opt/app
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: ray-worker
          namespace: ml
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: ray-worker
          template:
            metadata:
              labels:
                app: ray-worker
            spec:
              nodeSelector:
                fraud.torque.dev/workload: control
              containers:
                - name: ray
                  image: rayproject/ray:2.55.1-py312-cpu
                  command: ["/bin/bash", "-lc"]
                  args:
                    - until getent hosts ray-head.ml.svc.cluster.local; do sleep 2; done; ray start --address=ray-head.ml.svc.cluster.local:6379 --num-cpus=1 --object-store-memory=100000000 --disable-usage-stats --block
                  resources:
                    requests:
                      cpu: 300m
                      memory: 500Mi
                    limits:
                      cpu: "1"
                      memory: 900Mi
        ---
        apiVersion: batch/v1
        kind: Job
        metadata:
          name: ray-serve-deploy
          namespace: ml
        spec:
          template:
            spec:
              restartPolicy: OnFailure
              containers:
                - name: deploy
                  image: python:3.12-slim
                  command: ["/bin/bash", "-lc"]
                  args:
                    - |
                      python - <<'PY'
                      import json
                      import time
                      import urllib.request

                      payload = json.dumps({
                          "amount": 900,
                          "velocity_5m": 6,
                          "country": "US",
                          "billing_country": "CA",
                          "merchant_category": "travel",
                      }).encode()
                      for _ in range(120):
                          req = urllib.request.Request(
                              "http://ray-head.ml.svc.cluster.local:8000/score",
                              data=payload,
                              headers={"content-type": "application/json"},
                              method="POST",
                          )
                              try:
                                  with urllib.request.urlopen(req, timeout=3) as resp:
                                      body = resp.read().decode()
                                      if resp.status == 200 and "fraud_probability" in body and "model_version" in body:
                                          print(body, flush=True)
                                          raise SystemExit(0)
                          except Exception as exc:
                              last = exc
                          time.sleep(2)
                      raise RuntimeError(f"ray serve score endpoint not ready: {last}")
                      PY
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: spark-master
          namespace: ml
        spec:
          type: NodePort
          selector:
            app: spark-master
          ports:
            - name: master
              port: 7077
              targetPort: 7077
            - name: ui
              port: 8080
              targetPort: 8080
              nodePort: 32080
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: spark-master
          namespace: ml
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: spark-master
          template:
            metadata:
              labels:
                app: spark-master
            spec:
              hostname: spark-master
              nodeSelector:
                fraud.torque.dev/workload: mlbatch
              containers:
                - name: spark-master
                  image: apache/spark:4.1.2-scala2.13-java17-python3-ubuntu
                  command: ["/opt/spark/bin/spark-class"]
                  args: ["org.apache.spark.deploy.master.Master", "--host", "spark-master", "--port", "7077", "--webui-port", "8080"]
                  env:
                    - name: SPARK_DAEMON_MEMORY
                      value: 384m
                    - name: SPARK_MASTER_HOST
                      value: spark-master
                    - name: SPARK_MASTER_PORT
                      value: "7077"
                  ports:
                    - containerPort: 7077
                    - containerPort: 8080
                  resources:
                    requests:
                      cpu: 200m
                      memory: 384Mi
                    limits:
                      cpu: "1"
                      memory: 768Mi
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: spark-worker
          namespace: ml
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: spark-worker
          template:
            metadata:
              labels:
                app: spark-worker
            spec:
              nodeSelector:
                fraud.torque.dev/workload: processing
              containers:
                - name: spark-worker
                  image: apache/spark:4.1.2-scala2.13-java17-python3-ubuntu
                  command: ["/opt/spark/bin/spark-class"]
                  args: ["org.apache.spark.deploy.worker.Worker", "spark://spark-master.ml.svc.cluster.local:7077", "--cores", "1", "--memory", "512m", "--webui-port", "8081"]
                  env:
                    - name: SPARK_DAEMON_MEMORY
                      value: 256m
                  resources:
                    requests:
                      cpu: 200m
                      memory: 384Mi
                    limits:
                      cpu: "1"
                      memory: 768Mi
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: flink-jobmanager
          namespace: stream
        spec:
          type: NodePort
          selector:
            app: flink
            component: jobmanager
          ports:
            - name: rpc
              port: 6123
              targetPort: 6123
            - name: blob
              port: 6124
              targetPort: 6124
            - name: ui
              port: 8081
              targetPort: 8081
              nodePort: 32081
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: flink-jobmanager
          namespace: stream
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: flink
              component: jobmanager
          template:
            metadata:
              labels:
                app: flink
                component: jobmanager
            spec:
              nodeSelector:
                fraud.torque.dev/workload: control
              containers:
                - name: jobmanager
                  image: flink:1.20.1-scala_2.12-java17
                  securityContext:
                    runAsUser: 0
                  command: ["/bin/bash", "-lc"]
                  args:
                    - wget -q -O /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.20.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.20/flink-sql-connector-kafka-3.3.0-1.20.jar && exec /docker-entrypoint.sh jobmanager
                  env:
                    - name: JOB_MANAGER_RPC_ADDRESS
                      value: flink-jobmanager
                    - name: FLINK_PROPERTIES
                      value: |
                        jobmanager.rpc.address: flink-jobmanager
                        jobmanager.memory.process.size: 1024m
                        taskmanager.memory.process.size: 1200m
                        taskmanager.memory.task.heap.size: 256m
                        taskmanager.memory.managed.size: 128m
                        taskmanager.numberOfTaskSlots: 1
                        parallelism.default: 1
                  ports:
                    - containerPort: 6123
                    - containerPort: 8081
                  resources:
                    requests:
                      cpu: 200m
                      memory: 512Mi
                    limits:
                      cpu: "1"
                      memory: 1200Mi
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: flink-taskmanager
          namespace: stream
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: flink
              component: taskmanager
          template:
            metadata:
              labels:
                app: flink
                component: taskmanager
            spec:
              nodeSelector:
                fraud.torque.dev/workload: processing
              containers:
                - name: taskmanager
                  image: flink:1.20.1-scala_2.12-java17
                  securityContext:
                    runAsUser: 0
                  command: ["/bin/bash", "-lc"]
                  args:
                    - wget -q -O /opt/flink/lib/flink-sql-connector-kafka-3.3.0-1.20.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.20/flink-sql-connector-kafka-3.3.0-1.20.jar && exec /docker-entrypoint.sh taskmanager
                  env:
                    - name: JOB_MANAGER_RPC_ADDRESS
                      value: flink-jobmanager
                    - name: FLINK_PROPERTIES
                      value: |
                        jobmanager.rpc.address: flink-jobmanager
                        jobmanager.memory.process.size: 1024m
                        taskmanager.memory.process.size: 1200m
                        taskmanager.memory.task.heap.size: 256m
                        taskmanager.memory.managed.size: 128m
                        taskmanager.numberOfTaskSlots: 1
                        parallelism.default: 1
                  resources:
                    requests:
                      cpu: 200m
                      memory: 900Mi
                    limits:
                      cpu: "1"
                      memory: 1400Mi
        ---
        apiVersion: v1
        kind: Service
        metadata:
          name: payments-api
          namespace: apps
        spec:
          type: NodePort
          selector:
            app: payments-api
          ports:
            - name: http
              port: 8080
              targetPort: 8080
              nodePort: 30080
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: payments-api
          namespace: apps
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: payments-api
          template:
            metadata:
              labels:
                app: payments-api
            spec:
              nodeSelector:
                fraud.torque.dev/workload: events
              volumes:
                - name: payments-api-code
                  configMap:
                    name: payments-api-code
              containers:
                - name: api
                  image: python:3.12-slim
                  command: ["/bin/bash", "-lc"]
                  args:
                    - python -m pip install --no-cache-dir fastapi uvicorn boto3 kafka-python requests clickhouse-connect opentelemetry-sdk opentelemetry-exporter-otlp-proto-http >/tmp/pip.log 2>&1 && exec uvicorn payment_api:app --host 0.0.0.0 --port 8080 --app-dir /opt/app
                  envFrom:
                    - secretRef:
                        name: aws-s3
                  env:
                    - name: KAFKA_BOOTSTRAP
                      value: redpanda.data.svc.cluster.local:9092
                    - name: RAY_SCORE_URL
                      value: http://ray-head.ml.svc.cluster.local:8000/score
                    - name: CLICKHOUSE_HOST
                      value: signoz-clickhouse.observability.svc.cluster.local
                    - name: CLICKHOUSE_PASSWORD
                      value: torque-clickhouse
                    - name: OTEL_SERVICE_NAME
                      value: payments-api
                    - name: OTEL_EXPORTER_OTLP_ENDPOINT
                      value: http://signoz-otel-collector.observability.svc.cluster.local:4318
                  ports:
                    - containerPort: 8080
                  volumeMounts:
                    - name: payments-api-code
                      mountPath: /opt/app
                  resources:
                    requests:
                      cpu: 150m
                      memory: 256Mi
                    limits:
                      cpu: "1"
                      memory: 768Mi
                  readinessProbe:
                    httpGet:
                      path: /healthz
                      port: 8080
                    initialDelaySeconds: 30
                    periodSeconds: 5
        ---
        apiVersion: apps/v1
        kind: Deployment
        metadata:
          name: payments-generator
          namespace: apps
        spec:
          replicas: 1
          selector:
            matchLabels:
              app: payments-generator
          template:
            metadata:
              labels:
                app: payments-generator
            spec:
              nodeSelector:
                fraud.torque.dev/workload: events
              volumes:
                - name: payments-generator-code
                  configMap:
                    name: payments-generator-code
              containers:
                - name: generator
                  image: python:3.12-slim
                  command: ["/bin/bash", "-lc"]
                  args:
                    - python -m pip install --no-cache-dir requests >/tmp/pip.log 2>&1 && exec python /opt/app/generator.py
                  env:
                    - name: PAYMENTS_API_URL
                      value: http://payments-api.apps.svc.cluster.local:8080
                    - name: GENERATOR_BATCH_SIZE
                      value: "3"
                    - name: GENERATOR_INTERVAL_SECONDS
                      value: "10"
                  volumeMounts:
                    - name: payments-generator-code
                      mountPath: /opt/app
                  resources:
                    requests:
                      cpu: 50m
                      memory: 96Mi
                    limits:
                      cpu: 300m
                      memory: 256Mi
        YAML

        k -n apps rollout restart deployment/payments-api deployment/payments-generator
        k -n ml rollout restart deployment/ray-head

        k -n apps rollout status deployment/payments-api --timeout=10m
        k -n apps rollout status deployment/payments-generator --timeout=10m
        k -n data wait --for=condition=Ready pod/redpanda-0 --timeout=10m
        k -n data wait --for=condition=complete job/redpanda-topic-init --timeout=5m
        k -n data wait --for=condition=complete job/fraud-data-contracts --timeout=5m
        k -n data rollout status deployment/iceberg-rest --timeout=10m
        k -n data rollout status deployment/trino --timeout=10m
        k -n ml rollout status deployment/ray-head --timeout=10m
        k -n ml rollout status deployment/ray-worker --timeout=10m
        k -n ml wait --for=condition=complete job/ray-serve-deploy --timeout=10m
        k -n ml rollout status deployment/spark-master --timeout=10m
        k -n ml rollout status deployment/spark-worker --timeout=10m
        k -n stream rollout status deployment/flink-jobmanager --timeout=10m
        k -n stream rollout status deployment/flink-taskmanager --timeout=10m

        jm="$(k -n stream get pod -l app=flink,component=jobmanager -o jsonpath='{.items[0].metadata.name}')"
        sql="$(mktemp)"
        trap 'rm -f "${sql}"' EXIT
        cat >"${sql}" <<'SQL'
        CREATE TABLE raw_payments (
          event_id STRING,
          event_ts STRING,
          user_id STRING,
          account_id STRING,
          amount DOUBLE,
          merchant_id STRING,
          merchant_name STRING,
          merchant_category STRING,
          country STRING,
          billing_country STRING,
          device_type STRING,
          device_id STRING,
          velocity_5m INT,
          is_fraud_label INT
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'payments.raw',
          'properties.bootstrap.servers' = 'redpanda.data.svc.cluster.local:9092',
          'properties.group.id' = 'flink-risk-rules',
          'scan.startup.mode' = 'earliest-offset',
          'scan.bounded.mode' = 'latest-offset',
          'format' = 'json',
          'json.ignore-parse-errors' = 'true'
        );
        CREATE TABLE risk_events (
          event_id STRING,
          event_ts STRING,
          user_id STRING,
          amount DOUBLE,
          merchant_category STRING,
          country STRING,
          billing_country STRING,
          velocity_5m INT,
          rule_risk DOUBLE,
          rule_decision STRING
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'payments.risk',
          'properties.bootstrap.servers' = 'redpanda.data.svc.cluster.local:9092',
          'format' = 'json'
        );
        INSERT INTO risk_events
        SELECT
          event_id,
          event_ts,
          user_id,
          amount,
          merchant_category,
          country,
          billing_country,
          velocity_5m,
          CASE WHEN amount >= 800 OR velocity_5m >= 5 OR country <> billing_country THEN 1.0 ELSE 0.0 END,
          CASE WHEN amount >= 800 OR velocity_5m >= 5 OR country <> billing_country THEN 'review' ELSE 'approve' END
        FROM raw_payments;
        SQL
        k -n stream exec -i "${jm}" -- /bin/bash -lc 'cat >/tmp/risk.sql' <"${sql}"
        k -n stream exec "${jm}" -- /opt/flink/bin/sql-client.sh -f /tmp/risk.sql
        k -n apps rollout status deployment/payments-api --timeout=15m
        k -n apps rollout status deployment/payments-generator --timeout=5m
        k get pods -A -o wide
      deleteCommand: |
        set +e
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        kubectl --kubeconfig "${KUBECONFIG_PATH}" delete namespace apps data stream ml --ignore-not-found --timeout=180s

  - name: public-access
    kind: host.command.run
    needs: [workloads-install]
    host:
      transport: ssh
      targetEnv: TORQUE_LAB_SSH
      timeout: 5m
      command: |
        set -euo pipefail
        RUN_ROOT="/var/lib/torque-firecracker-k8s/fraud-platform"
        RUN_ROOT="${RUN_ROOT}" "${RUN_ROOT}/fraud-k3s-lab.sh" public-apply
      deleteCommand: |
        set +e
        RUN_ROOT="/var/lib/torque-firecracker-k8s/fraud-platform"
        if [ -x "${RUN_ROOT}/fraud-k3s-lab.sh" ]; then
          RUN_ROOT="${RUN_ROOT}" "${RUN_ROOT}/fraud-k3s-lab.sh" public-delete
        fi

  - name: argo-spark-batch
    kind: host.command.run
    needs: [public-access]
    host:
      transport: local
      timeout: 20m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        HOST_PUBLIC_IP="${TORQUE_LAB_PUBLIC_IP:?set TORQUE_LAB_PUBLIC_IP to the public host IP}"
        k() { kubectl --kubeconfig "${KUBECONFIG_PATH}" "$@"; }
        curl -fsS -X POST "http://${HOST_PUBLIC_IP}:30080/generate?count=50&fraud_rate=0.18"
        wf="$(k -n argo create -f - -o jsonpath='{.metadata.name}' <<'YAML'
        apiVersion: argoproj.io/v1alpha1
        kind: Workflow
        metadata:
          generateName: fraud-spark-batch-
        spec:
          entrypoint: spark-batch
          serviceAccountName: argo
          volumes:
            - name: spark-batch-code
              configMap:
                name: spark-batch-code
          templates:
            - name: spark-batch
              container:
                image: apache/spark:4.1.2-scala2.13-java17-python3-ubuntu
                command: ["/bin/bash", "-lc"]
                args:
                  - |
                    export HOME=/tmp
                    mkdir -p /tmp/.ivy2
                    python3 -m pip install --no-cache-dir boto3 clickhouse-connect
                    aws_region="${AWS_REGION:-${AWS_DEFAULT_REGION:-us-east-1}}"
                    spark_extra=(
                      --conf "spark.jars.ivy=/tmp/.ivy2"
                      --conf "spark.driver.extraJavaOptions=-Daws.region=${aws_region}"
                      --conf "spark.executor.extraJavaOptions=-Daws.region=${aws_region}"
                      --conf "spark.executorEnv.AWS_REGION=${aws_region}"
                      --conf "spark.executorEnv.AWS_DEFAULT_REGION=${aws_region}"
                      --conf "spark.executorEnv.AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}"
                      --conf "spark.executorEnv.AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}"
                    )
                    if [[ -n "${AWS_SESSION_TOKEN:-}" ]]; then
                      spark_extra+=(--conf "spark.executorEnv.AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}")
                    fi
                    /opt/spark/bin/spark-submit \
                      --packages org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
                      --master spark://spark-master.ml.svc.cluster.local:7077 \
                      "${spark_extra[@]}" \
                      --conf spark.driver.bindAddress=0.0.0.0 \
                      --conf spark.driver.host=$(hostname -i) \
                      --conf spark.executor.memory=512m \
                      --conf spark.driver.memory=512m \
                      /opt/app/spark_batch.py
                envFrom:
                  - secretRef:
                      name: aws-s3
                env:
                  - name: CLICKHOUSE_HOST
                    value: signoz-clickhouse.observability.svc.cluster.local
                  - name: CLICKHOUSE_PASSWORD
                    value: torque-clickhouse
                  - name: ICEBERG_REST_URI
                    value: http://iceberg-rest.data.svc.cluster.local:8181
                volumeMounts:
                  - name: spark-batch-code
                    mountPath: /opt/app
                resources:
                  requests:
                    cpu: 300m
                    memory: 768Mi
                  limits:
                    cpu: "1"
                    memory: 1400Mi
        YAML
        )"
        for attempt in $(seq 1 180); do
          phase="$(k -n argo get workflow "${wf}" -o jsonpath='{.status.phase}' 2>/dev/null || true)"
          echo "workflow=${wf} phase=${phase}"
          [[ "${phase}" == "Succeeded" ]] && break
          [[ "${phase}" == "Failed" || "${phase}" == "Error" ]] && { k -n argo get workflow "${wf}" -o yaml; exit 1; }
          sleep 5
        done
        k -n argo get workflow "${wf}" -o wide
      deleteCommand: "true"

  - name: replay-backfill
    kind: host.command.run
    needs: [argo-spark-batch]
    host:
      transport: local
      timeout: 20m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        k() { kubectl --kubeconfig "${KUBECONFIG_PATH}" "$@"; }

        redpanda_pod="$(k -n data get pod -l app=redpanda -o jsonpath='{.items[0].metadata.name}')"
        k -n data exec "${redpanda_pod}" -- rpk topic consume payments.raw -n 1 --offset start -X brokers=redpanda.data.svc.cluster.local:9092 >/tmp/torque-fraud-replay-raw.json
        test -s /tmp/torque-fraud-replay-raw.json

        run_id="replay-$(date -u +%Y%m%d%H%M%S)"
        wf="$(
          k -n argo create -f - -o jsonpath='{.metadata.name}' <<YAML
        apiVersion: argoproj.io/v1alpha1
        kind: Workflow
        metadata:
          generateName: fraud-replay-backfill-
        spec:
          entrypoint: spark-backfill
          serviceAccountName: argo
          volumes:
            - name: spark-batch-code
              configMap:
                name: spark-batch-code
          templates:
            - name: spark-backfill
              container:
                image: apache/spark:4.1.2-scala2.13-java17-python3-ubuntu
                command: ["/bin/bash", "-lc"]
                args:
                  - |
                    export HOME=/tmp
                    mkdir -p /tmp/.ivy2
                    python3 -m pip install --no-cache-dir boto3 clickhouse-connect
                    aws_region="\${AWS_REGION:-\${AWS_DEFAULT_REGION:-us-east-1}}"
                    spark_extra=(
                      --conf "spark.jars.ivy=/tmp/.ivy2"
                      --conf "spark.driver.extraJavaOptions=-Daws.region=\${aws_region}"
                      --conf "spark.executor.extraJavaOptions=-Daws.region=\${aws_region}"
                      --conf "spark.executorEnv.AWS_REGION=\${aws_region}"
                      --conf "spark.executorEnv.AWS_DEFAULT_REGION=\${aws_region}"
                      --conf "spark.executorEnv.AWS_ACCESS_KEY_ID=\${AWS_ACCESS_KEY_ID}"
                      --conf "spark.executorEnv.AWS_SECRET_ACCESS_KEY=\${AWS_SECRET_ACCESS_KEY}"
                    )
                    if [[ -n "\${AWS_SESSION_TOKEN:-}" ]]; then
                      spark_extra+=(--conf "spark.executorEnv.AWS_SESSION_TOKEN=\${AWS_SESSION_TOKEN}")
                    fi
                    /opt/spark/bin/spark-submit \
                      --packages org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
                      --master spark://spark-master.ml.svc.cluster.local:7077 \
                      "\${spark_extra[@]}" \
                      --conf spark.driver.bindAddress=0.0.0.0 \
                      --conf spark.driver.host=\$(hostname -i) \
                      --conf spark.executor.memory=512m \
                      --conf spark.driver.memory=512m \
                      /opt/app/spark_batch.py
                envFrom:
                  - secretRef:
                      name: aws-s3
                env:
                  - name: BACKFILL_RUN_ID
                    value: ${run_id}
                  - name: RAW_OBJECT_LIMIT
                    value: "500"
                  - name: DECISION_OBJECT_LIMIT
                    value: "500"
                  - name: CLICKHOUSE_HOST
                    value: signoz-clickhouse.observability.svc.cluster.local
                  - name: CLICKHOUSE_PASSWORD
                    value: torque-clickhouse
                  - name: ICEBERG_REST_URI
                    value: http://iceberg-rest.data.svc.cluster.local:8181
                volumeMounts:
                  - name: spark-batch-code
                    mountPath: /opt/app
                resources:
                  requests:
                    cpu: 300m
                    memory: 768Mi
                  limits:
                    cpu: "1"
                    memory: 1400Mi
        YAML
        )"
        for attempt in $(seq 1 180); do
          phase="$(k -n argo get workflow "${wf}" -o jsonpath='{.status.phase}' 2>/dev/null || true)"
          echo "workflow=${wf} phase=${phase}"
          [[ "${phase}" == "Succeeded" ]] && break
          [[ "${phase}" == "Failed" || "${phase}" == "Error" ]] && { k -n argo get workflow "${wf}" -o yaml; exit 1; }
          sleep 5
        done
        k -n argo get workflow "${wf}" -o wide

        trino_pod="$(k -n data get pod -l app=trino -o jsonpath='{.items[0].metadata.name}')"
        backfill_rows="$(
          k -n data exec "${trino_pod}" -- trino \
            --server http://localhost:8080 \
            --catalog iceberg \
            --schema fraud \
            --output-format CSV \
            --execute "SELECT count(*) FROM batch_feature_summary WHERE run_id = '${run_id}'" |
            tr -d '\r"' |
            awk -F, '/^[0-9]+/{value=$1} END{print value+0}'
        )"
        [[ "${backfill_rows:-0}" -gt 0 ]] || { echo "no Iceberg backfill rows for run_id=${run_id}" >&2; exit 1; }
        echo "replay_raw_sample=$(head -c 300 /tmp/torque-fraud-replay-raw.json)"
        echo "backfill_run_id=${run_id}"
        echo "iceberg_backfill_batch_features=${backfill_rows}"
      deleteCommand: "true"

  - name: verify-e2e
    kind: host.command.run
    needs: [replay-backfill]
    host:
      transport: local
      timeout: 10m
      command: |
        set -euo pipefail
        KUBECONFIG_PATH="${TORQUE_FRAUD_KUBECONFIG:-/tmp/torque-fraud-platform.kubeconfig}"
        HOST_PUBLIC_IP="${TORQUE_LAB_PUBLIC_IP:?set TORQUE_LAB_PUBLIC_IP to the public host IP}"
        k() { kubectl --kubeconfig "${KUBECONFIG_PATH}" "$@"; }
        . /tmp/torque-fraud-s3.env
        curl -fsS "http://${HOST_PUBLIC_IP}:30080/healthz"
        curl -fsS "http://${HOST_PUBLIC_IP}:3301/api/v1/health?live=1" || curl -fsS "http://${HOST_PUBLIC_IP}:3301/"
        curl -fsS "http://${HOST_PUBLIC_IP}:8265/api/version"
        curl -fsS "http://${HOST_PUBLIC_IP}:8080/json/"
        curl -fsS "http://${HOST_PUBLIC_IP}:8081/overview"
        curl -fsS "http://${HOST_PUBLIC_IP}:8082/v1/info"
        api_pod="$(k -n apps get pod -l app=payments-api -o jsonpath='{.items[0].metadata.name}')"
        s3_counts="$(k -n apps exec -i "${api_pod}" -- python - <<'PY'
        import os
        import boto3

        bucket = os.environ["S3_BUCKET"]
        s3 = boto3.client("s3", region_name=os.getenv("AWS_DEFAULT_REGION", os.getenv("AWS_REGION", "us-east-1")))

        def count(prefix):
            total = 0
            paginator = s3.get_paginator("list_objects_v2")
            for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
                total += len(page.get("Contents", []))
            return total

        print(count("raw/payments/"), count("curated/fraud_features/"))
        PY
        )"
        raw_count="$(awk '{print $1}' <<<"${s3_counts}")"
        curated_count="$(awk '{print $2}' <<<"${s3_counts}")"
        [[ "${raw_count}" -gt 0 ]] || { echo "no raw S3 payment objects" >&2; exit 1; }
        [[ "${curated_count}" -gt 0 ]] || { echo "no curated Spark feature objects" >&2; exit 1; }
        k -n apps exec -i "${api_pod}" -- python - <<'PY'
        import os
        import clickhouse_connect
        client = clickhouse_connect.get_client(
          host=os.getenv("CLICKHOUSE_HOST", "signoz-clickhouse.observability.svc.cluster.local"),
          port=8123,
          username="admin",
          password=os.getenv("CLICKHOUSE_PASSWORD", "torque-clickhouse"),
        )
        decisions = client.query("SELECT count() FROM fraud.payment_decisions").result_rows[0][0]
        batches = client.query("SELECT count() FROM fraud.batch_feature_summary").result_rows[0][0]
        print({"clickhouse_decisions": decisions, "clickhouse_batch_rows": batches})
        if decisions <= 0 or batches <= 0:
            raise SystemExit(1)
        PY
        k -n apps exec -i "${api_pod}" -- python - <<'PY'
        import json
        import urllib.request

        base = "http://redpanda.data.svc.cluster.local:8081"
        subjects = set(json.loads(urllib.request.urlopen(f"{base}/subjects", timeout=5).read()))
        required = {"payments.raw-value", "payments.risk-value", "payments.decisions-value"}
        missing = required - subjects
        if missing:
            raise SystemExit(f"missing schema subjects: {sorted(missing)}")
        compat = {}
        for subject in sorted(required):
            payload = json.loads(urllib.request.urlopen(f"{base}/config/{subject}", timeout=5).read())
            compat[subject] = payload.get("compatibilityLevel", payload.get("compatibility"))
        print({"schema_subjects": sorted(required), "compatibility": compat})
        PY
        trino_pod="$(k -n data get pod -l app=trino -o jsonpath='{.items[0].metadata.name}')"
        trino_count() {
          local catalog="$1"
          local query="$2"
          k -n data exec "${trino_pod}" -- trino \
            --server http://localhost:8080 \
            --catalog "${catalog}" \
            --schema fraud \
            --output-format CSV \
            --execute "${query}" |
            tr -d '\r"' |
            awk -F, '/^[0-9]+/{value=$1} END{print value+0}'
        }
        trino_decisions="$(trino_count clickhouse "SELECT count(*) FROM payment_decisions")"
        [[ "${trino_decisions:-0}" -gt 0 ]] || { echo "Trino could not query ClickHouse payment decisions" >&2; exit 1; }
        iceberg_raw="$(trino_count iceberg "SELECT count(*) FROM raw_payments")"
        iceberg_risk="$(trino_count iceberg "SELECT count(*) FROM risk_events")"
        iceberg_batches="$(trino_count iceberg "SELECT count(*) FROM batch_feature_summary")"
        [[ "${iceberg_raw:-0}" -gt 0 ]] || { echo "Trino could not query Iceberg raw payments" >&2; exit 1; }
        [[ "${iceberg_risk:-0}" -gt 0 ]] || { echo "Trino could not query Iceberg risk events" >&2; exit 1; }
        [[ "${iceberg_batches:-0}" -gt 0 ]] || { echo "Trino could not query Iceberg batch features" >&2; exit 1; }
        redpanda_pod="$(k -n data get pod -l app=redpanda -o jsonpath='{.items[0].metadata.name}')"
        risk_high_watermark=0
        for attempt in $(seq 1 30); do
          risk_high_watermark="$(k -n data exec "${redpanda_pod}" -- rpk topic describe payments.risk -p -X brokers=redpanda.data.svc.cluster.local:9092 | awk '$1=="0"{print $6}')"
          [[ "${risk_high_watermark:-0}" -gt 0 ]] && break
          sleep 2
        done
        [[ "${risk_high_watermark:-0}" -gt 0 ]] || { echo "no Flink risk messages produced" >&2; exit 1; }
        k -n data exec "${redpanda_pod}" -- rpk topic consume payments.risk -n 1 -X brokers=redpanda.data.svc.cluster.local:9092 >/tmp/torque-fraud-risk-message.json
        test -s /tmp/torque-fraud-risk-message.json
        echo "s3_raw_objects=${raw_count}"
        echo "s3_curated_objects=${curated_count}"
        echo "trino_payment_decisions=${trino_decisions}"
        echo "iceberg_raw_payments=${iceberg_raw}"
        echo "iceberg_risk_events=${iceberg_risk}"
        echo "iceberg_batch_features=${iceberg_batches}"
        echo "risk_high_watermark=${risk_high_watermark}"
        echo "risk_message_sample=$(head -c 300 /tmp/torque-fraud-risk-message.json)"
      deleteCommand: "true"
