OCI Sync

Pulls an OCI artifact (manifest + layers) from a registry and emits one event per layer. Downstream tasks decide what to do with each layer’s content — parse it, store it, transform it.

Works with any OCI-compliant registry: GHCR, ECR, GAR, ACR, Artifactory, Harbor, Docker Hub, Quay, self-hosted.

Each event contains {path, content, digest, artifact_digest}. The shape mirrors Git Sync so the same downstream pipeline (buffer → diff → cache write) works with either source.

Configuration

- oci_sync:
    name: pull_flows
    artifact: "registry.example.com/your-org/your-flows:prod"
    credentials_path: /etc/flowgen/credentials/registry.json

Fields

FieldTypeDefaultDescription
namestringrequiredTask name.
artifactstringrequiredFull OCI reference, e.g. registry.example.com/org/flows:prod or registry.example.com/org/flows@sha256:abcd….
credentials_pathstringPath to a JSON credentials file. Two formats are auto-detected; see credentials. Anonymous auth if omitted.
force_pullboolfalseBypass the manifest-digest cache and re-pull every tick. Use only to re-seed a downstream cache mutated out of band; leave off in steady state.
depends_onlistUpstream task names.
retryobjectRetry configuration.

Example: sync flows from a registry into the NATS KV cache

flow:
  name: oci_sync_flows
  tasks:
    - generate:
        name: trigger
        interval: "30s"

    - oci_sync:
        name: pull_repo
        artifact: "registry.example.com/your-org/your-flows:prod"
        credentials_path: /etc/flowgen/credentials/registry.json

    - buffer:
        name: collect_layers
        size: 10000
        timeout: "5s"

    - script:
        name: write_cache_keys
        code: |
          let actions = [];
          for layer in event.data.batch {
              let parsed = parse_yaml(layer.content);
              let key = "flowgen.flows." + parsed.flow.name;
              actions.push(#{
                  action: "put",
                  key: key,
                  content: layer.content,
              });
          }
          actions

    - nats_kv_store:
        name: save_to_kv
        operation: put
        bucket: flowgen_system
        key: "{{event.data.key}}"
        credentials_path: /etc/nats/credentials.json

Output

Format: JSON. Each layer emitted produces an event with event.data containing:

FieldTypeDescription
pathstringFile path in the artifact, derived from the layer’s org.opencontainers.image.title annotation set during oras push. Falls back to layer-<index> if the annotation is missing.
contentstringLayer blob as UTF-8. Non-UTF-8 layers (e.g. binary blobs) produce an error.
digeststringLayer blob digest (sha256:…).
artifact_digeststringWhole-artifact manifest digest. The same value across all events from one pull.

Bootstrap flows

Two end-to-end bootstrap flows reconcile an OCI artifact into the system cache. They tick on an interval, list existing cache entries, and emit one put per layer and one delete per orphaned key:

  • examples/oci/system_sync_flows.yaml — keys each entry by flow.name parsed from the layer body so the filename is incidental. The reconciler reads from flowgen.flows.* and starts, stops, and hot-reloads flows accordingly.
  • examples/oci/system_sync_resources.yaml — keys each entry by the layer’s relative path under flowgen.resources.*. The runtime ResourceLoader reads from the same keys when tasks reference resource: <path>. See Resources.

Both skip the rest of their pipeline when the artifact digest has not moved, so the only cost on a no-change tick is a manifest HEAD plus a list_keys round-trip.

Change detection

Each tick issues an HTTP HEAD against the artifact’s manifest URL to read Docker-Content-Digest from the response header. The digest is compared against the last successful pull, cached under flow.{flow_name}.oci_digest.{artifact} in the shared cache. On a match, the layer blobs are not fetched and the source emits only the upstream completion signal — one line per tick in the logs:

INFO flowgen_oci::sync::processor: OCI manifest digest unchanged since last pull, skipping layer fetch artifact=… digest=sha256:…

Works for both mutable tags (:prod) and immutable digests (@sha256:…) — the digest is authoritative in either case, so a re-tag of :prod to a new release still triggers a pull.

The cached digest is persisted only after every layer event was sent, so a mid-pull failure causes the next tick to re-emit the full batch.

Set force_pull: true to bypass the cache — use only to re-seed a downstream cache mutated out of band.