Salesforce CDC Replication

Replicate Salesforce Change Data Capture events to object storage and a data warehouse using NATS as a durable buffer.

Architecture

The pipeline is split into two flows connected by a NATS JetStream stream:

NATS decouples Salesforce event delivery from downstream writes. If the data warehouse or object storage is temporarily unavailable, events stay in the stream and the writer retries from where it left off.

Subscriber flow

Connects to Salesforce Pub/Sub API and republishes every change event into NATS. Deploy one subscriber per CDC topic.

flow:
  name: salesforce_account_subscriber
  require_leader_election: true
  tasks:

    - salesforce_pubsubapi_subscriber:
        name: start_subscription
        credentials_path: /etc/sfdc/credentials.json
        topic:
          name: /data/AccountChangeEvent
          durable_consumer_options:
            enabled: true
            managed_subscription: false
            name: salesforce_account_subscriber

    - nats_jetstream_publisher:
        name: publish_events
        credentials_path: /etc/nats/credentials.json
        url: "{{env.NATS_URL}}"
        subject: pubsub.data.{{event.subject}}
        stream:
          create_or_update: true
          name: salesforce
          description: "Salesforce platform events and change events."
          subjects: ["pubsub.>"]
          max_age: "24h"
          retention: limits
          discard: old

require_leader_election: true ensures only one pod runs the subscriber β€” Salesforce CDC topics should not have competing consumers.

The NATS subject encodes the event type via {{event.subject}}. The subscriber lowercases the topic name, so AccountChangeEvent lands on pubsub.data.accountchangeevent, OpportunityChangeEvent on pubsub.data.opportunitychangeevent, etc. A single NATS stream with subject filter pubsub.> captures all of them.

Writer flow

Consumes from the NATS stream and branches to multiple destinations. Both sinks declare depends_on: [strip_subject_prefix], so flowgen clones every event into both paths and runs them in parallel. The NATS message is acknowledged only after both leaf tasks complete successfully.

flow:
  name: salesforce_writer
  tasks:

    - nats_jetstream_subscriber:
        name: start_subscription
        credentials_path: /etc/nats/credentials.json
        url: "{{env.NATS_URL}}"
        subject: pubsub.>
        stream:
          create_or_update: true
          name: salesforce
          description: "Salesforce platform events and change events."
          subjects: ["pubsub.>"]
          max_age: "24h"
          retention: limits
          discard: old
        durable_name: salesforce_writer

    - script:
        name: strip_subject_prefix
        code: |
          let parts = event.subject.split(".");
          if parts.len() >= 3 && parts[0] == "pubsub" && (parts[1] == "data" || parts[1] == "event") {
              event.subject = parts[1] + "/" + parts[2];
          }
          event

    - object_store:
        name: write_to_object_store
        operation: write
        credentials_path: /etc/gcp/credentials.json
        path: "{{env.GCS_BUCKET_PATH}}/salesforce/{{event.subject}}"
        hive_partition_options:
          enabled: true
          partition_keys:
            - EventDate
        depends_on: [strip_subject_prefix]

    - gcp_bigquery_storage_write:
        name: write_to_data_warehouse
        credentials_path: /etc/gcp/credentials.json
        project_id: "{{env.GCP_PROJECT_ID}}"
        dataset_id: salesforce
        table_id: account
        change_type: upsert
        depends_on: [strip_subject_prefix]

Swapping destinations

Both branches are independent β€” remove or replace either one without affecting the other.

Object store is cloud-agnostic. The same object_store task works with GCS, S3, and Azure by changing the path prefix and credentials:

CloudPath prefixCredentials
GCSgs://bucket/pathGCP service account JSON
AWS S3s3://bucket/pathAWS access key JSON + client_options.aws_region
Azureaz://container/pathAzure storage account JSON
    # AWS S3 example
    - object_store:
        name: write_to_object_store
        operation: write
        credentials_path: /etc/aws/credentials.json
        path: s3://my-bucket/salesforce/{{event.subject}}
        client_options:
          aws_region: us-east-1
        hive_partition_options:
          enabled: true
          partition_keys:
            - EventDate
        depends_on: [strip_subject_prefix]

Data warehouse β€” the example above uses gcp_bigquery_storage_write. Replace it with the appropriate warehouse task for your stack. See the task reference for configuration details.

Credentials

FilePurposeContents
/etc/sfdc/credentials.jsonSalesforce Pub/Sub APIclient_id, client_secret, username, password, login_url
/etc/nats/credentials.jsonNATS JetStreamurl, nkey_seed or credentials_path
/etc/gcp/credentials.jsonGCS + BigQueryGCP service account JSON key

See Credentials for format details.

Backfilling

To replay from the start of the Salesforce retention window, set replay_preset: earliest on the subscriber:

    - salesforce_pubsubapi_subscriber:
        name: start_subscription
        credentials_path: /etc/sfdc/credentials.json
        topic:
          name: /data/AccountChangeEvent
          durable_consumer_options:
            enabled: true
            managed_subscription: false
            name: salesforce_account_subscriber
            replay_preset: earliest

Once the subscriber catches up it resumes from the latest position. Remove replay_preset after the backfill completes.

Multiple CDC topics

Deploy one subscriber flow per topic. All subscribers publish to the same NATS stream, and the writer consumes from all of them via the pubsub.> wildcard:

The writer routes each event type by {{event.subject}} in the object store path. For per-topic warehouse tables, use a script task to set the table name dynamically or deploy separate writer flows per topic.

CDC vs Bulk API

CDC ReplicationBulk API Export
LatencyReal-time (seconds)Scheduled (minutes)
DataIndividual change eventsFull or incremental snapshots
VolumePer-recordBatched CSV
SchemaAvro (typed)CSV (requires schema casting)
Use caseKeep warehouse in syncPeriodic snapshots, backfills

Both patterns can coexist β€” use CDC for real-time sync and Bulk API for periodic full reconciliation. See Salesforce Data Export for the Bulk API pattern.