This guide explains how to stream processed data from RisingWave into existing Iceberg tables. Use this when you have Iceberg tables managed by external systems and want RisingWave to deliver processed results into them.
Prerequisites
- Ensure you have existing Iceberg tables that you can deliver to, or the ability to create them via external systems.
- Access credentials for the underlying object storage (e.g., S3 access key and secret key).
- Appropriate permissions to deliver to the target Iceberg catalog and storage.
- An upstream source, table, or materialized view in RisingWave to output data from.
Basic connection example
CREATE SINK my_iceberg_sink FROM processed_events
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-data-lake/warehouse',
database.name = 'analytics',
table.name = 'processed_user_events',
catalog.type = 'glue',
catalog.name = 'my_glue_catalog',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Data update modes
Append-only
Use append-only mode when you only want to add new records to the target table:
-- From an append-only source
CREATE SINK events_sink FROM user_events_source
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'events',
table.name = 'user_activity',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Upsert
Use upsert mode when you need to handle updates and deletes:
-- From an upsert table with primary key
CREATE SINK users_sink FROM user_profiles
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'users',
table.name = 'profiles',
catalog.type = 'rest',
catalog.uri = 'http://rest-catalog:8181',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Force append-only
Convert upsert streams to append-only by ignoring deletes and converting updates to inserts:
CREATE SINK audit_log FROM user_changes
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'audit',
table.name = 'user_change_log',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Catalog configurations
AWS Glue catalog
For tables managed by AWS Glue:
CREATE SINK glue_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'glue',
catalog.name = 'my_catalog',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
REST catalog
For REST catalog services including AWS S3 Tables:
CREATE SINK rest_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'id',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'rest',
catalog.uri = 'http://rest-catalog:8181',
catalog.credential = 'username:password',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
JDBC catalog
For JDBC-based catalogs:
CREATE SINK jdbc_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'jdbc',
catalog.uri = 'jdbc:postgresql://postgres:5432/catalog',
catalog.jdbc.user = 'catalog_user',
catalog.jdbc.password = 'catalog_password',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Advanced features
Exactly-once delivery
Enable exactly-once delivery semantics for critical data pipelines:
CREATE SINK critical_data FROM important_events
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'event_id',
is_exactly_once = 'true',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'critical',
table.name = 'events',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Enabling exactly-once delivery provides stronger consistency guarantees but may impact performance due to additional coordination overhead.
Commit configuration
Control commit frequency and retry behavior:
CREATE SINK configurable_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
commit_checkpoint_interval = 10, -- Commit every 10 checkpoints
commit_retry_num = 5, -- Retry failed commits 5 times
warehouse.path = 's3://data-lake/warehouse',
database.name = 'analytics',
table.name = 'metrics',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
The approximate time to commit is calculated as:
time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval
Compaction for Iceberg sink
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
When sinking data to Iceberg, small data files and delete files can accumulate over time, degrading read performance. RisingWave supports Iceberg compaction to merge these files and expire old snapshots.
To configure Iceberg compaction when creating a sink, specify the following parameters in the WITH
clause:
Parameter | Description |
---|
enable_compaction | Whether to enable Iceberg compaction (true /false ). |
compaction_interval_sec | Interval (in seconds) between two compaction runs. Defaults to 3600 seconds. |
enable_snapshot_expiration | Whether to enable snapshot expiration. By default, it removes snapshots older than 5 days. |
CREATE SINK sink_demo_rest FROM t
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = true,
s3.endpoint = 'https://s3.ap-southeast-2.amazonaws.com',
s3.region = 'ap-southeast-2',
s3.access.key = 'xxxx',
s3.secret.key = 'xxxx',
s3.path.style.access = 'true',
catalog.type = 'rest',
catalog.uri = 'http://localhost:8181/api/catalog',
warehouse.path = 'warehouse',
database.name = 'ns',
table.name = 't1',
-- the following configurations are about iceberg compaction
enable_compaction=true,
compaction_interval_sec=3600,
enable_snapshot_expiration=true
);
Iceberg compaction also requires a dedicated Iceberg compactor.Currently, please contact us via RisingWave Slack workspace to allocate the necessary resources. We are working on a self-service feature that will let you allocate Iceberg compactors directly from the cloud portal.
RisingWave also supports compaction for Iceberg table engine. For details, see
Compaction for Iceberg table engine.
Use with different storage backends
Amazon S3
CREATE SINK s3_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Google Cloud Storage
CREATE SINK gcs_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 'gs://my-bucket/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'rest',
catalog.uri = 'http://catalog-service:8181',
gcs.credential = 'xxxxx'
);
gcs.credential
is base64-encoded credential key obtained from the GCS service account key JSON file. To get this JSON file, refer to the guides of GCS documentation. - To encode it in base64, run the following command:
cat ~/Downloads/rwc-byoc-test-464bdd851bce.json | base64 -b 0 | pbcopy
, and then paste the output as the value for this parameter. - If this field is not specified, ADC (application default credentials) will be used.
Azure Blob Storage
CREATE SINK azure_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 'abfss://container@account.dfs.core.windows.net/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'rest',
catalog.uri = 'http://catalog-service:8181',
azblob.account_name = 'your_account',
azblob.account_key = 'your_key'
);
Amazon S3 Tables integration
AWS S3 Tables provides automatic compaction and optimization:
CREATE SINK s3_tables_sink FROM processed_data
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'id',
warehouse.path = 's3://my-bucket/my-table-bucket/',
database.name = 'analytics',
table.name = 'user_metrics',
catalog.type = 'rest',
catalog.uri = 'https://s3tables.us-east-1.amazonaws.com/tables',
catalog.rest.signing_region = 'us-east-1',
catalog.rest.signing_name = 's3tables',
catalog.rest.sigv4_enabled = 'true',
s3.region = 'us-east-1'
);
Data type mapping
RisingWave data types map to Iceberg types as follows:
RisingWave Type | Iceberg Type | Notes |
---|
BOOLEAN | boolean | |
SMALLINT | int | |
INT | int | |
BIGINT | long | |
REAL | float | |
DOUBLE PRECISION | double | |
VARCHAR | string | |
BYTEA | binary | |
DECIMAL(p,s) | decimal(p,s) | |
TIME | time | |
DATE | date | |
TIMESTAMP | timestamp | |
TIMESTAMPTZ | timestamptz | |
INTERVAL | string | Serialized as string |
JSONB | string | Serialized as JSON string |
ARRAY | list | |
STRUCT | struct | |
MAP | map | |
Configuration parameters
Required parameters
Parameter | Description |
---|
connector | Must be 'iceberg' |
type | Sink mode: 'append-only' or 'upsert' |
database.name | Target Iceberg database name |
table.name | Target Iceberg table name |
Optional parameters
Parameter | Description | Default |
---|
primary_key | Primary key for upsert sinks | None |
force_append_only | Force append-only mode from upsert source | false |
is_exactly_once | Enable exactly-once delivery | false |
commit_checkpoint_interval | Commit interval in checkpoints | 60 |
commit_retry_num | Number of commit retries | 8 |
For detailed storage and catalog configuration:
Integration patterns
Real-time analytics pipeline
Stream aggregated results to analytics tables:
-- Process real-time events
CREATE MATERIALIZED VIEW hourly_metrics AS
SELECT
user_id,
date_trunc('hour', event_timestamp) as hour,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as session_count
FROM user_events
GROUP BY user_id, date_trunc('hour', event_timestamp);
-- Sink to data lake for analytics
CREATE SINK analytics_sink FROM hourly_metrics
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id,hour',
warehouse.path = 's3://analytics-lake/warehouse',
database.name = 'metrics',
table.name = 'hourly_user_metrics',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Change data capture
Stream database changes to data lake:
-- CDC from PostgreSQL
CREATE SOURCE user_changes
WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'user',
password = 'password',
database.name = 'app_db',
schema.name = 'public'
);
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR,
email VARCHAR,
created_at TIMESTAMPTZ
)
FROM user_changes TABLE 'users';
-- Stream to data lake
CREATE SINK user_lake_sink FROM users
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'raw',
table.name = 'users',
catalog.type = 'glue'
);
Best practices
- Choose appropriate sink mode: Use append-only for event logs, upsert for dimensional data.
- Configure commit intervals: Balance latency vs file size based on your requirements.
- Enable exactly-once for critical data: Use for financial transactions or other critical data.
- Monitor sink lag: Track how far behind your sink is from the source data.
- Design proper partitioning: Ensure target tables are properly partitioned for query performance.
- Handle backpressure: Monitor sink performance and adjust resources as needed.
Monitoring and troubleshooting
-- Check sink status
SHOW SINKS;
-- View sink details
DESCRIBE SINK my_iceberg_sink;
Limitations
- Schema evolution: Limited support for automatic schema changes.
- Concurrent writers: Coordinate with other systems writing to the same tables.
Next steps