-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Hello,
I am having issues with using the Postgres-CDC source connector.
I have a setup that streams the captured channges on a Postgres table to an Iceberg table.
In our preproduction environment that has very low traffic it's working fine.
In our production environment it periodically gets java.net.SocketException: Socket is closed leading to the job being cancelled after too many exceptions.
I have tried increasing Postgres wal_sender_timeout to 20 minutes. Does not make any difference with the default 1 minute
Some informations anout our setup:
The job is run with the Zeta engine in a Seatunnel cluster comprised of 2 masters and 2 workers, all in EKS.
The Postgres DB version is 16.8. The Postgres instance and the Seatunnel cluster are in the same VPC.
The Seatunnel has been deployed using the Seatunnel provided helm chart.
The only meaningfull change made to the values.yaml is to pin down the seatunnel version to 2.3.11 as I have never managed to get the Postgres connector to connect to our table in 2.3.12.
Here is the full diff of the values for the sake of completeness:
Common subdirectories: seatunnel-helm-original/conf and seatunnel-helm/conf
Common subdirectories: seatunnel-helm-original/templates and seatunnel-helm/templates
diff --color=auto seatunnel-helm-original/values.yaml seatunnel-helm/values.yaml
24c24
< tag: ""
---
> tag: "2.3.11"
31c31
< value: Asia/Shanghai
---
> value: UTC
48,51c48,51
< prometheus.io/path: /hazelcast/rest/instance/metrics
< prometheus.io/port: "5801"
< prometheus.io/scrape: "true"
< prometheus.io/role: "seatunnel-master"
---
> # prometheus.io/path: /hazelcast/rest/instance/metrics
> # prometheus.io/port: "5801"
> # prometheus.io/scrape: "true"
> # prometheus.io/role: "seatunnel-master"
58c58,59
< nodeSelector: {}
---
> nodeSelector:
> node.family: on-demand
61c62,66
< tolerations: []
---
> tolerations:
> - key: node.family
> operator: Equal
> value: on-demand
> effect: NoSchedule
64,71c69,75
< resources: {}
< # resources:
< # limits:
< # memory: "4Gi"
< # cpu: "4"
< # requests:
< # memory: "2Gi"
< # cpu: "500m"
---
> resources:
> limits:
> memory: "4Gi"
> cpu: "2"
> requests:
> memory: "4Gi"
> cpu: "2"
111a116
> karpenter.sh/do-not-disrupt: "true"
124,131c129,135
< resources: {}
< # resources:
< # limits:
< # memory: "4Gi"
< # cpu: "4"
< # requests:
< # memory: "2Gi"
< # cpu: "500m"
---
> resources:
> limits:
> memory: "10Gi"
> cpu: "2"
> requests:
> memory: "10Gi"
> cpu: "2"
160a165,196
>
> # Secret configuration for database credentials
> secret:
> # Enable secret creation
> enabled: true
> # Secret name
> name: "postgres-credentials"
> # Database credentials
> dbUser: "redacted"
> dbPassword: "redacted" # your password here
>
> # Persistent Volume Claim for checkpoint storage
> persistence:
> # Enable PVC creation for checkpoint storage
> enabled: true
> # PVC name
> name: "seatunnel-checkpoint-pvc"
> # Access mode
> accessModes:
> - ReadWriteMany
> # Storage size
> size: 10Gi
> # Storage class name (optional)
> storageClassName: "efs-sc"
> # Mount path for checkpoints
> mountPath: "/tmp/seatunnel/checkpoint_snapshot"
>
> # ServiceAccount configuration
> serviceAccount:
> # Annotations for the ServiceAccount (e.g., EKS IAM role)
> annotations:
> eks.amazonaws.com/role-arn: "redacted
Here are the current Postgres parameters we are running with.
wal_sender_timeout 6min
tcp_keepalives_idle 300
tcp_keepalives_interval 30
tcp_keepalives_count 3
This is our job definition:
# SeaTunnel Job: PostgreSQL CDC to Iceberg Test Table
# Streams real-time changes from PostgreSQL redacted to Iceberg table redacted
env {
execution.parallelism = 1
job.retry.times = 3
job.mode = "STREAMING"
job.name = "redacted"
# Enable checkpointing for fault tolerance - every 5 minutes # low frequency checkpoints allow for less frequent writes to icebrg
checkpoint.interval = 300000
checkpoint.timeout = 900000
read_limit.rows_per_second = 2500
}
source {
Postgres-CDC {
# Plugin identifier
plugin_output = "postgres_cdc_source"
# Database connection
base-url = "jdbc:postgresql://redacted/redacted?tcpKeepAlive=true"
username = "redacted"
password = ${DB_PASSWORD}
# Database and schema to monitor
database-names = ["redacted"]
schema-names = ["public"]
# Tables to monitor (format: database.schema.table)
table-names = ["redacted"]
# Startup mode
# LATEST: Skip snapshot, start streaming from current WAL position (streaming only)
# INITIAL: Read full snapshot, then continue streaming (recommended for CDC)
# NOTE: After job failure, start fresh job (don't use -r resume, it's broken).
# INITIAL mode + replication slot = automatic recovery from last committed position
startup.mode = "INITIAL"
slot.name = "seatunnel_iceberg_test_slot"
# Decoding plugin (pgoutput is recommended for PostgreSQL 10+)
decoding.plugin.name = "pgoutput"
# Snapshot configuration - optimized for high throughput with 16GB pod
# With 146M rows and 500k chunks = ~300 splits (manageable memory)
snapshot.split.size = 500000 # 500k rows per split (CRITICAL - keeps split count low)
snapshot.fetch.size = 30000 # 30k rows per fetch (high performance)
# Incremental snapshot chunk size (for parallel snapshot reading)
scan.incremental.snapshot.chunk.size = 200000
# Connection settings
connect.timeout.ms = 800000
connect.max-retries = 6
connection.pool.size = 20
# Server timezone
server-time-zone = "UTC"
# Exactly-once semantics (recommended for production)
exactly_once = true
# Pass-through Debezium properties
# Use existing publication created by table owner
debezium = {
"publication.autocreate.mode" = "disabled"
"publication.name" = "seatunnel_cdc_publication"
# keepalive / heartbeat
}
# Output format
format = "DEFAULT"
}
}
sink {
Iceberg {
plugin_input = "postgres_cdc_source"
catalog_name = "glue_catalog"
catalog_type = "glue"
iceberg.table.upsert-mode-enabled = true
primary_keys = "redacted"
# AWS Glue catalog configuration with AssumeRoleAwsClientFactory
iceberg.catalog.config = {
"catalog-impl" = "org.apache.iceberg.aws.glue.GlueCatalog"
"warehouse" = "redacted"
"io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
"glue.account-id" = "redacted"
# Use AssumeRoleAwsClientFactory for cross-account access
"client.factory" = "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"
"client.assume-role.arn" = "redacted"
"client.assume-role.region" = "redacted
"write.update.mode" = "merge-on-read"
"write.delete.mode" = "merge-on-read"
# "write.target-file-size-bytes" = "67108864" # 64MB files
}
iceberg.table.write-props = {
"write.update.mode" = "merge-on-read"
"write.delete.mode" = "merge-on-read"
}
namespace = "redacted"
table = "redacted"
}
}
This is our stack trace when the exception happens:
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
... 5 more
Caused by: org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:180)
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask.execute(PostgresWalFetchTask.java:74)
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:107)
... 5 more
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1166)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:504)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
... 7 more
Caused by: java.net.SocketException: Socket is closed
at java.net.Socket.setSoTimeout(Socket.java:1155)
at sun.security.ssl.BaseSSLSocketImpl.setSoTimeout(BaseSSLSocketImpl.java:639)
at sun.security.ssl.SSLSocketImpl.setSoTimeout(SSLSocketImpl.java:73)
at org.postgresql.core.PGStream.hasMessagePending(PGStream.java:210)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1208)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1164)
... 14 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
2025-12-03 00:17:29,651 INFO [s.c.s.s.c.ClientExecuteCommand] [SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close signal
And at the same time we see this in the logs of our database.
SSL error: SSLV3_ALERT_UNEXPECTED_MESSAGE
LOG: could not receive data from client: Connection reset by peer
LOG: unexpected EOF on standby connection
CONTEXT: slot "seatunnel_iceberg_test_slot", output plugin "pgoutput"
Does anyone know about this issue ?