Skip to main content
This guide will show you how to sink data from RisingWave to PostgreSQL using either the JDBC connector or the native PostgreSQL connector. The sink parameters are similar to those for other JDBC-available databases, such as MySQL. However, we will cover the configurations specific to PostgreSQL and how to verify that data is successfully sunk.

PostgreSQL Sink Options

RisingWave provides two ways to sink data to PostgreSQL:
  1. JDBC Connector (connector = 'jdbc') - Uses JDBC for database connectivity
  2. Native PostgreSQL Connector (connector = 'postgres') - Uses native PostgreSQL protocol
Both approaches support the same data types and provide similar functionality, but the native connector may offer better performance for PostgreSQL-specific features.

Prerequisites

You can test out this process on your own device by using the postgres-sink demo in the integration_test directory of the RisingWave repository.

Set up a PostgreSQL database

  • AWS RDS
  • Self-hosted

Set up a PostgreSQL RDS instance on AWS

Here we will use a standard class instance without Multi-AZ deployment as an example.
  1. Log in to the AWS console. Search “RDS” in services and select the RDS panel.
  2. Create a database with PostgreSQL as the Engine type. We recommend setting up a username and password or using other security options.
  3. When the new instance becomes available, click on its panel.
  4. From the Connectivity panel, we can find the endpoint and connection port information.

Connect to the RDS instance from Postgres

Now we can connect to the RDS instance. Make sure you have installed psql on your local machine, and start a psql prompt. Fill in the endpoint, the port, and login credentials in the connection parameters.
psql --host = pg-to-rw.xxxxxx.us-east-1.rds.amazonaws.com --port=5432 --username=awsuser --password
For more login options, refer to the RDS connection guide.

Create a table in PostgreSQL

Use the following query to set up a table in PostgreSQL. We will sink to this table from RisingWave.
CREATE TABLE target_count (
  target_id VARCHAR(128) PRIMARY KEY,
  target_count BIGINT
);

Set up RisingWave

Install and launch RisingWave

To install and start RisingWave locally, see the Get started guide. We recommend running RisingWave locally for testing purposes.

Notes about running RisingWave from binaries

If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure you have JDK 11 or later versions installed in your environment.

Create a sink

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector = 'jdbc' | 'postgres',
   field_name = 'field', ...
);

Parameters (JDBC)

Parameter or clauseDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
connectorSink connector should be jdbc. To switch from jdbc to postgres, set stream_switch_jdbc_pg_to_native = true under [streaming.developer].
jdbc.urlRequired. The JDBC URL of the destination database necessary for the driver to recognize and connect to the database.
userThe user name for the database connection.
passwordThe password for the database connection.
jdbc.query.timeoutSpecifies the timeout for the operations to downstream. If not set, the default is 60s.
jdbc.auto.commitControls whether to automatically commit transactions for JDBC sink. If not set, the default is false.
table.nameRequired. The table in the destination database you want to sink to.
schema.nameThe schema in the destination database you want to sink to. The default value is public.
typeSink data type. Supported types:
  • append-only: Sink data as INSERT operations.
  • upsert: Sink data as UPDATE, INSERT and DELETE operations.
primary_keyRequired if type is upsert. The primary key of the sink, which should match the primary key of the downstream table.

Parameters (Postgres Native)

RisingWave introduced the native Postgres sink connector in version 2.2, and the JDBC sink connector for Postgres will be deprecated in a future release. You can try it in-place for your JDBC sinks by setting stream_switch_jdbc_pg_to_native = true under [streaming.developer].
Parameter or clauseDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
connectorSink connector must be postgres.
userThe user name for the database connection.
passwordThe password for the database connection.
databaseRequired. The database in the destination database you want to sink to.
tableRequired. The table in the destination database you want to sink to.
schemaThe schema in the destination database you want to sink to. The default value is public.
typeSink data type. Supported types:
  • append-only: Sink data as INSERT operations.
  • upsert: Sink data as UPDATE, INSERT and DELETE operations.
primary_keyRequired if type is upsert. The primary key of the sink, which should match the primary key of the downstream table.
ssl_modeThe ssl.mode parameter determines the level of SSL/TLS encryption for secure communication with Postgres. Accepted values are disabled, preferred, required, verify-ca, and verify-full. The default value is disabled.
  • When set to required, it enforces TLS for establishing a connection;
  • When set to verify-ca, it verifies that the server is trustworthy by checking the certificate chain up to the root certificate stored on the client;
  • When set to verify-full, it verifies the certificate and also ensures the server hostname matches the name in the certificate.
ssl_root_certSpecify the root certificate secret. You must create secret first and then use it here.

Sink data from RisingWave to PostgreSQL

Create source and materialized view

You can sink data from a table or a materialized view in RisingWave to PostgreSQL. For demonstration purposes, we’ll create a source and a materialized view, and then sink data from the materialized view. If you already have a table or materialized view to sink data from, you don’t need to perform this step. Run the following query to create a source to read data from a Kafka broker.
CREATE SOURCE user_behaviors (
    user_id VARCHAR,
    target_id VARCHAR,
    target_type VARCHAR,
    event_timestamp TIMESTAMPTZ,
    behavior_type VARCHAR,
    parent_target_type VARCHAR,
    parent_target_id VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'user_behaviors',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Next, we will create a materialized view that queries the number of targets for each target_id. Note that the materialized view and the target table share the same schema.
CREATE MATERIALIZED VIEW target_count AS
SELECT
    target_id,
    COUNT(*) AS target_count
FROM
    user_behaviors
GROUP BY
    target_id;

Sink from RisingWave

You can use either the JDBC connector or the native PostgreSQL connector to sink data to PostgreSQL. Use the native PostgreSQL connector for better performance and PostgreSQL-specific features:
CREATE SINK target_count_postgres_sink FROM target_count WITH (
    connector = 'postgres',
    host = 'postgres',
    port = '5432',
    user = 'myuser',
    password = '123456',
    database = 'mydb',
    table = 'target_count',
    type = 'upsert'
);
Use the JDBC connector for broader compatibility with PostgreSQL deployments: Use the following query to sink data from the materialized view to the target table in PostgreSQL. Ensure that the jdbc_url is accurate and reflects the PostgreSQL database that you are connecting to. See CREATE SINK for more details.
CREATE SINK target_count_postgres_sink FROM target_count WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://postgres:5432/mydb',
    user = 'myuser',
    password = '123456',
    table.name = 'target_count',
    type = 'upsert',
    primary_key = 'target_id'
);

Verify update

To ensure that the target table has been updated, query from target_count in PostgreSQL.
SELECT * FROM target_count
LIMIT 10;

Advanced configurations

Native PostgreSQL Connector Parameters

ParameterDescriptionRequiredDefault
hostPostgreSQL server hostnameYes
portPostgreSQL port numberNo5432
userPostgreSQL usernameYes
passwordPostgreSQL passwordYes
databaseTarget database nameYes
tableTarget table nameYes
schemaTarget schema nameNopublic
typeSink type: append-only or upsertYes
ssl_modeSSL mode: disable, require, preferNoprefer
ssl.root.certSSL root certificate pathNo
max_batch_rowsMaximum rows per batchNo1024

JDBC Connector Parameters

ParameterDescriptionRequired
jdbc.urlJDBC connection URLYes
userDatabase usernameYes
passwordDatabase passwordYes
table.nameTarget table nameYes
typeSink type: append-only or upsertYes
primary_keyPrimary key columnsNo

Configuration examples

Native PostgreSQL with SSL

CREATE SINK postgres_native_ssl_sink
FROM secure_data_mv
WITH (
    connector = 'postgres',
    host = 'secure-postgres.example.com',
    port = '5432',
    user = 'risingwave_user',
    password = 'secure_password',
    database = 'secure_db',
    table = 'secure_metrics',
    type = 'upsert',
    ssl_mode = 'require',
    ssl.root.cert = '/path/to/ca-cert.pem'
);

JDBC with connection pooling

CREATE SINK postgres_jdbc_pooled_sink
FROM high_volume_mv
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://postgres-cluster:5432/analytics_db?sslmode=require&ApplicationName=RisingWave',
    user = 'analytics_user',
    password = 'password',
    table.name = 'high_volume_metrics',
    type = 'upsert',
    primary_key = 'metric_id,timestamp'
);

Batch optimized configuration

CREATE SINK postgres_batch_optimized_sink
FROM performance_metrics_mv
WITH (
    connector = 'postgres',
    host = 'postgres-performance.example.com',
    port = '5432',
    user = 'performance_user',
    password = 'password',
    database = 'performance_db',
    table = 'metrics_table',
    type = 'append-only',
    max_batch_rows = '2048'
);

Cross-schema sink

CREATE SINK postgres_cross_schema_sink
FROM analytics_data_mv
WITH (
    connector = 'postgres',
    host = 'analytics-postgres.example.com',
    port = '5432',
    user = 'analytics_user',
    password = 'password',
    database = 'analytics_db',
    schema = 'analytics',
    table = 'daily_metrics',
    type = 'upsert'
);

Common issues and solutions

  1. Connection timeouts: Increase timeout values or check network connectivity
  2. Authentication failures: Verify credentials and pg_hba.conf settings
  3. Table lock conflicts: Monitor for long-running transactions
  4. Disk space issues: Monitor PostgreSQL data directory space
  5. Memory issues: Check shared_buffers and work_mem settings
  6. SSL certificate errors: Verify certificate configuration for secure connections

Performance optimization

  • Batch size: Use optimal batch size (100-1000 recommended)
  • Indexes: Ensure proper indexes on target tables
  • Connection pooling: Configure appropriate connection pool settings
  • VACUUM and ANALYZE: Regular maintenance for optimal performance
  • WAL configuration: Optimize write-ahead logging for write-heavy workloads

Security best practices

Authentication and authorization

CREATE SINK postgres_secure_sink
FROM secure_data_mv
WITH (
    connector = 'postgres',
    host = 'secure-postgres.example.com',
    port = '5432',
    user = 'secure_user',
    password = 'strong_password',
    database = 'secure_db',
    table = 'secure_table',
    ssl_mode = 'require'
);

SSL/TLS encryption

CREATE SINK postgres_ssl_sink
FROM sensitive_data_mv
WITH (
    connector = 'postgres',
    host = 'ssl-postgres.example.com',
    port = '5432',
    user = 'ssl_user',
    password = 'ssl_password',
    database = 'ssl_db',
    table = 'ssl_table',
    ssl_mode = 'require',
    ssl.root.cert = '/path/to/ca-cert.pem'
);

Limitations

  • Transaction size: Large transactions may impact performance
  • Connection limits: Subject to PostgreSQL max_connections limit
  • Lock contention: Concurrent writes may cause lock conflicts
  • Data type constraints: Some RisingWave types may require conversion
  • Network latency: Performance depends on network connectivity
  • Schema changes: Limited support for automatic schema migration
PostgreSQL has specific limits and performance characteristics. Monitor your database performance and configure appropriate resources for your workload.
Use appropriate indexes, batch sizes, and connection pooling for optimal performance. Regular maintenance (VACUUM, ANALYZE) is important for long-running systems.
Ensure proper authentication and network security. Use SSL/TLS for production deployments and implement proper access controls and monitoring.
For the PostgreSQL data type mapping table, see the Data type mapping table under the Ingest data from PostgreSQL CDC topic. Additional notes regarding sinking data to PostgreSQL:
  • A varchar column in RisingWave can be sinked to a uuid column in Postgres.
  • Only one-dimensional arrays in RisingWave can be sinked to PostgreSQL.
  • For array type, we only support smallint, integer, bigint, real, double precision, and varchar type now.
I