Configuring domain sync for real-time spatial feeds

When synchronizing real-time spatial feeds across a federated mesh, the primary failure vector is schema drift in vector/tile payloads during cross-domain propagation. Silent coordinate system mismatches (e.g., EPSG:4326 vs EPSG:3857) and topology degradation bypass standard validation layers, corrupting downstream tile generators and routing engines. This implementation enforces strict contract validation at the ingestion boundary, operating within the Domain Sync Protocols for Spatial Data specification. Precise offset management is mandatory to maintain sub-second geospatial routing SLAs and prevent cascading partition rebalances.

Exact Connector Configuration & Routing Syntax

Deploy the following Kafka Connect distributed worker configuration. The architecture relies on schema.compatibility=BACKWARD_TRANSITIVE to permit additive field evolution while rejecting structural geometry mutations. The RegexRouter transform rewrites the destination topic name based on the source table, ensuring spatial partition locality.

json
{
  "name": "spatial-feed-sync-prod",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg-primary-geo.internal",
    "database.port": "5432",
    "database.dbname": "mesh_spatial_registry",
    "table.include.list": "public.realtime_feeds,public.vector_tiles",
    "plugin.name": "pgoutput",
    "schema.registry.url": "https://schema-registry.mesh.internal:8081",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://schema-registry.mesh.internal:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "https://schema-registry.mesh.internal:8081",
    "value.converter.schemas.enable": "true",
    "transforms": "routeByEnvelope",
    "transforms.routeByEnvelope.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.routeByEnvelope.regex": "mesh_spatial_registry\\.public\\.(.*)",
    "transforms.routeByEnvelope.replacement": "spatial.sync.$1",
    "offset.flush.timeout.ms": "10000",
    "max.batch.size": "2048",
    "poll.interval.ms": "50",
    "tasks.max": "4",
    "errors.tolerance": "none",
    "errors.deadletterqueue.topic.name": "spatial.dlq.geo",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true"
  }
}

Note: ExtractField$Key extracts a single field from a Kafka record key (a Struct), not from the geometry payload. To route by spatial envelope, use RegexRouter on the topic name (based on source table) as shown above, and implement spatial partitioning logic in a custom Partitioner on the producer side. The Debezium connector does not natively support geometry-based routing transforms.

Apply via the Kafka Connect REST API. Use PUT for idempotent deployment, which overwrites existing connector state without creating duplicates:

bash
curl -X PUT -H "Content-Type: application/json" \
  --data @spatial-sync-config.json \
  http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/config

Verify partition assignment and consumer group stability:

bash
kafka-consumer-groups \
  --bootstrap-server broker-1.mesh.internal:9092 \
  --describe \
  --group spatial-feed-sync-prod

Idempotent Execution & Offset Management

Spatial routing requires deterministic offset commits to prevent duplicate tile generation or coordinate interpolation gaps. Configure the following operational safeguards:

  1. Transactional Offset Commits: Enable exactly.once.source.support=enabled in the distributed worker configuration (connect-distributed.properties) to bind offset commits to the producer transaction. This guarantees that geometry payloads and their corresponding offsets are committed atomically.
  2. Flush Timeout Enforcement: offset.flush.timeout.ms=10000 prevents worker threads from blocking indefinitely during high-throughput ingestion bursts. If the timeout is breached, the connector pauses polling and logs OffsetFlushTimeoutException, triggering automatic backpressure handling.
  3. Idempotent Task Restarts: When a connector task fails, restart only the failed task to avoid full connector rebalancing:
    bash
    curl -X POST \
      http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/tasks/0/restart
    
  4. Offset Reset Protocol: Never use --reset-offsets --to-earliest on production spatial topics. Instead, use --reset-offsets --to-offset <partition>:<offset> aligned with the last valid geometry envelope checkpoint.

Root-Cause Analysis Workflow for Sync Desync

When downstream consumers report InvalidGeometryException or tile misalignment, execute the following diagnostic sequence to isolate schema drift versus offset lag.

Step 1: Validate Schema Registry Contract

Query the active schema version for the vector_tiles subject. Cross-reference the geometry field type against the OGC GeoJSON specification RFC 7946.

bash
curl -s http://schema-registry.mesh.internal:8081/subjects/vector_tiles-value/versions/latest \
  | jq '.schema' | python3 -m json.tool

Failure Indicator: Missing crs property (note: RFC 7946 explicitly removes the crs member; its presence indicates a pre-RFC 7946 schema), type changed from object to string, or coordinates array depth altered.

Step 2: Isolate Offset Lag vs. Payload Corruption

Run consumer group lag analysis. High lag with zero DLQ throughput indicates downstream processing bottlenecks. Zero lag with high DLQ throughput indicates schema or topology violations.

bash
kafka-consumer-groups \
  --bootstrap-server broker-1.mesh.internal:9092 \
  --describe \
  --group spatial-feed-sync-prod \
  --members \
  --verbose

Step 3: Extract Diagnostic Log Patterns

Filter Connect worker logs for deterministic failure signatures:

bash
grep -E "InvalidGeometryException|SchemaVersionMismatch|OffsetCommitFailed|TopologyDegradation" \
  /var/log/kafka/connect-distributed.log | tail -n 50
  • SchemaVersionMismatch: Registry rejected a payload due to BACKWARD_TRANSITIVE violation.
  • TopologyDegradation: Geometry self-intersection or invalid ring ordering detected during WKT parsing.
  • OffsetCommitFailed: Worker lost coordination with the consumer group coordinator; indicates network partition or broker unavailability.

Escalation Paths & Incident Response Matrix

Adhere to the following tiered escalation thresholds. All actions must be logged in the incident tracking system with the correlation ID extracted from the X-Trace-Id header.

Severity Trigger Condition Immediate Action Escalation Path
P3 Consumer lag > 500ms across >2 partitions Scale tasks.max to 8; verify broker I/O throughput Platform Engineering
P2 DLQ throughput > 5% of ingestion rate; SchemaVersionMismatch detected Pause connector; revert Schema Registry to last stable version; replay from checkpoint GIS Data Stewardship + Schema Owners
P1 Sub-second routing SLA breached; TopologyDegradation propagating to tile generators Isolate affected domain; trigger Federated Ownership & Routing Architecture circuit breaker; route traffic to fallback geocoding chains Incident Commander + Mesh Architecture Lead

Recovery Protocol for P1 Events:

  1. Halt connector:
    bash
    curl -X PUT \
      http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/pause
    
  2. Drain pending offsets to current position:
    bash
    kafka-consumer-groups \
      --bootstrap-server broker-1.mesh.internal:9092 \
      --group spatial-feed-sync-prod \
      --reset-offsets \
      --to-current \
      --execute
    
  3. Validate schema contract against the latest stable release in the schema registry.
  4. Resume connector and monitor consumer-lag and record-error-rate metrics for 15 minutes before closing the incident.

All spatial sync configurations must be version-controlled in the infrastructure-as-code repository. Manual curl overrides are prohibited outside of active incident response windows. For heavy spatial query backpressure, route to async execution pipelines rather than blocking the ingestion worker.