Skip to content

[Postgres-CDC] Several Socket closed exceptions cause the job to finally be cancelled #10149

@hmalaspina

Description

@hmalaspina

Hello,
I am having issues with using the Postgres-CDC source connector.
I have a setup that streams the captured channges on a Postgres table to an Iceberg table.
In our preproduction environment that has very low traffic it's working fine.
In our production environment it periodically gets java.net.SocketException: Socket is closed leading to the job being cancelled after too many exceptions.
I have tried increasing Postgres wal_sender_timeout to 20 minutes. Does not make any difference with the default 1 minute

Some informations anout our setup:
The job is run with the Zeta engine in a Seatunnel cluster comprised of 2 masters and 2 workers, all in EKS.
The Postgres DB version is 16.8. The Postgres instance and the Seatunnel cluster are in the same VPC.
The Seatunnel has been deployed using the Seatunnel provided helm chart.

The only meaningfull change made to the values.yaml is to pin down the seatunnel version to 2.3.11 as I have never managed to get the Postgres connector to connect to our table in 2.3.12.

Here is the full diff of the values for the sake of completeness:

Common subdirectories: seatunnel-helm-original/conf and seatunnel-helm/conf
Common subdirectories: seatunnel-helm-original/templates and seatunnel-helm/templates
diff --color=auto seatunnel-helm-original/values.yaml seatunnel-helm/values.yaml
24c24
<   tag: ""
---
>   tag: "2.3.11"
31c31
<     value: Asia/Shanghai
---
>     value: UTC
48,51c48,51
<     prometheus.io/path: /hazelcast/rest/instance/metrics
<     prometheus.io/port: "5801"
<     prometheus.io/scrape: "true"
<     prometheus.io/role: "seatunnel-master"
---
>     # prometheus.io/path: /hazelcast/rest/instance/metrics
>     # prometheus.io/port: "5801"
>     # prometheus.io/scrape: "true"
>     # prometheus.io/role: "seatunnel-master"
58c58,59
<   nodeSelector: {}
---
>   nodeSelector:
>     node.family: on-demand
61c62,66
<   tolerations: []
---
>   tolerations:
>     - key: node.family
>       operator: Equal
>       value: on-demand
>       effect: NoSchedule
64,71c69,75
<   resources: {}
<   # resources:
<   #   limits:
<   #     memory: "4Gi"
<   #     cpu: "4"
<   #   requests:
<   #     memory: "2Gi"
<   #     cpu: "500m"
---
>   resources:
>     limits:
>       memory: "4Gi"
>       cpu: "2"
>     requests:
>       memory: "4Gi"
>       cpu: "2"
111a116
>     karpenter.sh/do-not-disrupt: "true"
124,131c129,135
<   resources: {}
<   # resources:
<   #   limits:
<   #     memory: "4Gi"
<   #     cpu: "4"
<   #   requests:
<   #     memory: "2Gi"
<   #     cpu: "500m"
---
>   resources:
>     limits:
>       memory: "10Gi"
>       cpu: "2"
>     requests:
>       memory: "10Gi"
>       cpu: "2"
160a165,196
>
> # Secret configuration for database credentials
> secret:
>   # Enable secret creation
>   enabled: true
>   # Secret name
>   name: "postgres-credentials"
>   # Database credentials
>   dbUser: "redacted"
>   dbPassword: "redacted" # your password here
>
> # Persistent Volume Claim for checkpoint storage
> persistence:
>   # Enable PVC creation for checkpoint storage
>   enabled: true
>   # PVC name
>   name: "seatunnel-checkpoint-pvc"
>   # Access mode
>   accessModes:
>     - ReadWriteMany
>   # Storage size
>   size: 10Gi
>   # Storage class name (optional)
>   storageClassName: "efs-sc"
>   # Mount path for checkpoints
>   mountPath: "/tmp/seatunnel/checkpoint_snapshot"
>
> # ServiceAccount configuration
> serviceAccount:
>   # Annotations for the ServiceAccount (e.g., EKS IAM role)
>   annotations:
>     eks.amazonaws.com/role-arn: "redacted

Here are the current Postgres parameters we are running with.

wal_sender_timeout 6min
tcp_keepalives_idle 300
tcp_keepalives_interval 30
tcp_keepalives_count 3

This is our job definition:

# SeaTunnel Job: PostgreSQL CDC to Iceberg Test Table
# Streams real-time changes from PostgreSQL redacted to Iceberg table redacted

env {
  execution.parallelism = 1
  job.retry.times = 3
  job.mode = "STREAMING"
  job.name = "redacted"
  
  # Enable checkpointing for fault tolerance - every 5 minutes # low frequency checkpoints allow for less frequent writes to icebrg
  checkpoint.interval = 300000
  checkpoint.timeout = 900000
  
  
  read_limit.rows_per_second = 2500
}

source {
  Postgres-CDC {
    # Plugin identifier
    plugin_output = "postgres_cdc_source"
    # Database connection
    base-url = "jdbc:postgresql://redacted/redacted?tcpKeepAlive=true"
    username = "redacted"
    password = ${DB_PASSWORD}
    
    # Database and schema to monitor
    database-names = ["redacted"]
    schema-names = ["public"]
    
    # Tables to monitor (format: database.schema.table)
    table-names = ["redacted"]
    
    # Startup mode
    # LATEST: Skip snapshot, start streaming from current WAL position (streaming only)
    # INITIAL: Read full snapshot, then continue streaming (recommended for CDC)
    # NOTE: After job failure, start fresh job (don't use -r resume, it's broken).
    # INITIAL mode + replication slot = automatic recovery from last committed position
    startup.mode = "INITIAL"
    

    slot.name = "seatunnel_iceberg_test_slot"
    
    # Decoding plugin (pgoutput is recommended for PostgreSQL 10+)
    decoding.plugin.name = "pgoutput"
    
    # Snapshot configuration - optimized for high throughput with 16GB pod
    # With 146M rows and 500k chunks = ~300 splits (manageable memory)
    snapshot.split.size = 500000     # 500k rows per split (CRITICAL - keeps split count low)
    snapshot.fetch.size = 30000      # 30k rows per fetch (high performance)
    
    # Incremental snapshot chunk size (for parallel snapshot reading)
    scan.incremental.snapshot.chunk.size = 200000
    
    # Connection settings
    connect.timeout.ms = 800000
    connect.max-retries = 6
    connection.pool.size = 20
    
    # Server timezone
    server-time-zone = "UTC"
    
    # Exactly-once semantics (recommended for production)
    exactly_once = true
    
    # Pass-through Debezium properties
    # Use existing publication created by table owner
    debezium = {
        "publication.autocreate.mode" = "disabled"
        "publication.name" = "seatunnel_cdc_publication"
             # keepalive / heartbeat
    }
    
    # Output format
    format = "DEFAULT"
  }
}

sink {
  Iceberg {
    plugin_input = "postgres_cdc_source"
    
    catalog_name = "glue_catalog"
    catalog_type = "glue"
    
    iceberg.table.upsert-mode-enabled = true
    primary_keys = "redacted"
    # AWS Glue catalog configuration with AssumeRoleAwsClientFactory
    iceberg.catalog.config = {
      "catalog-impl" = "org.apache.iceberg.aws.glue.GlueCatalog"
      "warehouse" = "redacted"
      "io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
      "glue.account-id" = "redacted"
      # Use AssumeRoleAwsClientFactory for cross-account access
      "client.factory" = "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"
      "client.assume-role.arn" = "redacted"
      "client.assume-role.region" = "redacted
      "write.update.mode" = "merge-on-read"
      "write.delete.mode" = "merge-on-read"
      # "write.target-file-size-bytes" = "67108864"  # 64MB files
    }
    
    iceberg.table.write-props = {
      "write.update.mode" = "merge-on-read"
      "write.delete.mode" = "merge-on-read"

    }

    namespace = "redacted"
    table = "redacted"
  }
}

This is our stack trace when the exception happens:

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
	... 5 more
Caused by: org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:180)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask.execute(PostgresWalFetchTask.java:74)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:107)
	... 5 more
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
	at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1166)
	at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44)
	at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160)
	at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125)
	at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:504)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
	... 7 more
Caused by: java.net.SocketException: Socket is closed
	at java.net.Socket.setSoTimeout(Socket.java:1155)
	at sun.security.ssl.BaseSSLSocketImpl.setSoTimeout(BaseSSLSocketImpl.java:639)
	at sun.security.ssl.SSLSocketImpl.setSoTimeout(SSLSocketImpl.java:73)
	at org.postgresql.core.PGStream.hasMessagePending(PGStream.java:210)
	at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1208)
	at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1164)
	... 14 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more
2025-12-03 00:17:29,651 INFO  [s.c.s.s.c.ClientExecuteCommand] [SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close signal

And at the same time we see this in the logs of our database.

SSL error: SSLV3_ALERT_UNEXPECTED_MESSAGE
LOG:  could not receive data from client: Connection reset by peer
LOG:  unexpected EOF on standby connection
CONTEXT: slot "seatunnel_iceberg_test_slot", output plugin "pgoutput"

Does anyone know about this issue ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions