Configuring domain sync for real-time spatial feeds
When synchronizing real-time spatial feeds across a federated mesh, the primary failure vector is schema drift in vector/tile payloads during cross-domain propagation. Silent coordinate system mismatches (e.g., EPSG:4326 vs EPSG:3857) and topology degradation bypass standard validation layers, corrupting downstream tile generators and routing engines. This implementation enforces strict contract validation at the ingestion boundary, operating within the Domain Sync Protocols for Spatial Data specification. Precise offset management is mandatory to maintain sub-second geospatial routing SLAs and prevent cascading partition rebalances.
Exact Connector Configuration & Routing Syntax
Deploy the following Kafka Connect distributed worker configuration. The architecture relies on schema.compatibility=BACKWARD_TRANSITIVE to permit additive field evolution while rejecting structural geometry mutations. The RegexRouter transform rewrites the destination topic name based on the source table, ensuring spatial partition locality.
{
"name": "spatial-feed-sync-prod",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-primary-geo.internal",
"database.port": "5432",
"database.dbname": "mesh_spatial_registry",
"table.include.list": "public.realtime_feeds,public.vector_tiles",
"plugin.name": "pgoutput",
"schema.registry.url": "https://schema-registry.mesh.internal:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://schema-registry.mesh.internal:8081",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://schema-registry.mesh.internal:8081",
"value.converter.schemas.enable": "true",
"transforms": "routeByEnvelope",
"transforms.routeByEnvelope.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByEnvelope.regex": "mesh_spatial_registry\\.public\\.(.*)",
"transforms.routeByEnvelope.replacement": "spatial.sync.$1",
"offset.flush.timeout.ms": "10000",
"max.batch.size": "2048",
"poll.interval.ms": "50",
"tasks.max": "4",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name": "spatial.dlq.geo",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
Note: ExtractField$Key extracts a single field from a Kafka record key (a Struct), not from the geometry payload. To route by spatial envelope, use RegexRouter on the topic name (based on source table) as shown above, and implement spatial partitioning logic in a custom Partitioner on the producer side. The Debezium connector does not natively support geometry-based routing transforms.
Apply via the Kafka Connect REST API. Use PUT for idempotent deployment, which overwrites existing connector state without creating duplicates:
curl -X PUT -H "Content-Type: application/json" \
--data @spatial-sync-config.json \
http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/config
Verify partition assignment and consumer group stability:
kafka-consumer-groups \
--bootstrap-server broker-1.mesh.internal:9092 \
--describe \
--group spatial-feed-sync-prod
Idempotent Execution & Offset Management
Spatial routing requires deterministic offset commits to prevent duplicate tile generation or coordinate interpolation gaps. Configure the following operational safeguards:
- Transactional Offset Commits: Enable
exactly.once.source.support=enabledin the distributed worker configuration (connect-distributed.properties) to bind offset commits to the producer transaction. This guarantees that geometry payloads and their corresponding offsets are committed atomically. - Flush Timeout Enforcement:
offset.flush.timeout.ms=10000prevents worker threads from blocking indefinitely during high-throughput ingestion bursts. If the timeout is breached, the connector pauses polling and logsOffsetFlushTimeoutException, triggering automatic backpressure handling. - Idempotent Task Restarts: When a connector task fails, restart only the failed task to avoid full connector rebalancing:
bash curl -X POST \ http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/tasks/0/restart - Offset Reset Protocol: Never use
--reset-offsets --to-earlieston production spatial topics. Instead, use--reset-offsets --to-offset <partition>:<offset>aligned with the last valid geometry envelope checkpoint.
Root-Cause Analysis Workflow for Sync Desync
When downstream consumers report InvalidGeometryException or tile misalignment, execute the following diagnostic sequence to isolate schema drift versus offset lag.
Step 1: Validate Schema Registry Contract
Query the active schema version for the vector_tiles subject. Cross-reference the geometry field type against the OGC GeoJSON specification RFC 7946.
curl -s http://schema-registry.mesh.internal:8081/subjects/vector_tiles-value/versions/latest \
| jq '.schema' | python3 -m json.tool
Failure Indicator: Missing crs property (note: RFC 7946 explicitly removes the crs member; its presence indicates a pre-RFC 7946 schema), type changed from object to string, or coordinates array depth altered.
Step 2: Isolate Offset Lag vs. Payload Corruption
Run consumer group lag analysis. High lag with zero DLQ throughput indicates downstream processing bottlenecks. Zero lag with high DLQ throughput indicates schema or topology violations.
kafka-consumer-groups \
--bootstrap-server broker-1.mesh.internal:9092 \
--describe \
--group spatial-feed-sync-prod \
--members \
--verbose
Step 3: Extract Diagnostic Log Patterns
Filter Connect worker logs for deterministic failure signatures:
grep -E "InvalidGeometryException|SchemaVersionMismatch|OffsetCommitFailed|TopologyDegradation" \
/var/log/kafka/connect-distributed.log | tail -n 50
SchemaVersionMismatch: Registry rejected a payload due toBACKWARD_TRANSITIVEviolation.TopologyDegradation: Geometry self-intersection or invalid ring ordering detected during WKT parsing.OffsetCommitFailed: Worker lost coordination with the consumer group coordinator; indicates network partition or broker unavailability.
Escalation Paths & Incident Response Matrix
Adhere to the following tiered escalation thresholds. All actions must be logged in the incident tracking system with the correlation ID extracted from the X-Trace-Id header.
| Severity | Trigger Condition | Immediate Action | Escalation Path |
|---|---|---|---|
| P3 | Consumer lag > 500ms across >2 partitions | Scale tasks.max to 8; verify broker I/O throughput |
Platform Engineering |
| P2 | DLQ throughput > 5% of ingestion rate; SchemaVersionMismatch detected |
Pause connector; revert Schema Registry to last stable version; replay from checkpoint | GIS Data Stewardship + Schema Owners |
| P1 | Sub-second routing SLA breached; TopologyDegradation propagating to tile generators |
Isolate affected domain; trigger Federated Ownership & Routing Architecture circuit breaker; route traffic to fallback geocoding chains | Incident Commander + Mesh Architecture Lead |
Recovery Protocol for P1 Events:
- Halt connector:
bash curl -X PUT \ http://connect-cluster.mesh.internal:8083/connectors/spatial-feed-sync-prod/pause - Drain pending offsets to current position:
bash kafka-consumer-groups \ --bootstrap-server broker-1.mesh.internal:9092 \ --group spatial-feed-sync-prod \ --reset-offsets \ --to-current \ --execute - Validate schema contract against the latest stable release in the schema registry.
- Resume connector and monitor
consumer-lagandrecord-error-ratemetrics for 15 minutes before closing the incident.
All spatial sync configurations must be version-controlled in the infrastructure-as-code repository. Manual curl overrides are prohibited outside of active incident response windows. For heavy spatial query backpressure, route to async execution pipelines rather than blocking the ingestion worker.