Apache Iceberg + Trino + iceberg-kafka-connector
Guide to setting up Apache Iceberg + Trino locally + writing data to Iceberg in real-using the new iceberg-kafka-connector.
Full Example: https://github.com/hendoxc/recipe/tree/main/iceberg/kafka-connect
First let’s get our setup to the point where we can issue queries to Iceberg tables using Trino.
We will need the following components:
Trino
Apache Iceberg Rest Catalog
Postgres (used by the Rest Catalog)
Minio (as a replacement for s3)
first we need the base docker-compose.yml
file
version: "3"
services:
# add our components here
networks:
iceberg_net:
volumes:
postgres_data:
minio_data:
Minio
This is an Object Store that is s3 compatible, so any apis/clients that use s3 are compatible with Minio. Its very simple: you have Buckets and Objects, just like s3.
minio:
image: minio/minio
container_name: minio
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
MINIO_DOMAIN: minio
networks:
iceberg_net:
aliases:
- demo-iceberg.minio
volumes:
- minio_data:/data
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
Lets also add a command that will create a bucket demo-iceberg
for our data.
AWS_ACCESS_KEY_ID
is same asMINIO_ROOT_USER
AWS_SECRET_ACCESS_KEY
is same asMINIO_ROOT_PASSWORD
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
AWS_REGION: eu-west-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/demo-iceberg;
/usr/bin/mc policy set public minio/demo-iceberg;
tail -f /dev/null
"
You can open the Minio UI at http://localhost:9000
and view the created bucket demo-iceberg
in the object explorer.
Trino
To add a catalog to Trino, you must add the relevant properties file to the folder /etc/trino/catalog
We will mount this properties file to add an Iceberg Catalog to Trino
iceberg.properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://rest:8181
iceberg.rest-catalog.warehouse=s3://demo-iceberg/
iceberg.file-format=PARQUET
# s3 in minio requires hive configuration
# see also https://github.com/trinodb/trino/pull/16557
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
#hive.s3.aws-access-key=admin
#hive.s3.aws-secret-key=password
iceberg.rest-catalog.uri
is the iceberg-rest-catalog that we are going to runiceberg.rest-catalog.warehouse
this is the bucket we created + a path that iceberg schemas and their tables are written to. Im using the root of the bucket. you could just as easily uses3://demo-iceberg/some/folder/
and for Trino
trino:
image: trinodb/trino
container_name: trino
ports:
- "8080:8080"
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./iceberg.properties:/etc/trino/catalog/iceberg.properties
environment:
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
AWS_REGION: eu-west-1
AWS_ACCESS_KEY_ID
is same asMINIO_ROOT_USER
AWS_SECRET_ACCESS_KEY
is same asMINIO_ROOT_PASSWORD
Note how we are mounting our iceberg.properties
file to /etc/trino/catalog/
this allows Trino to use iceberg tables
you should have a basic Trino dashboard at http://localhost:8080
Apache Iceberg Rest Catalog
This is what other components like Trino & Kafka Connect (Spark, Flink etc) will use to find out about schemas & tables and their metadata. there are other Catalog alternatives out there like Nessie, Hive Catalog, Aws Glue Catalog.
To run an instance of Apache Iceberg Rest Catalog, we need to have an underlying db as the backing store. Lets use postgres
since thats simple:
postgres:
image: postgres:latest
container_name: postgres-iceberg
restart: always
ports:
- "5432:5432"
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: password
POSTGRES_DB: iceberg_catalog
networks:
iceberg_net:
healthcheck:
test: ["CMD-SHELL", "pg_isready -U admin -d iceberg_catalog"]
interval: 5s
timeout: 5s
retries: 5
And then for the rest catalog itself:
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
depends_on:
postgres:
condition: service_healthy
ports:
- 8181:8181
environment:
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
AWS_REGION: eu-west-1
CATALOG_WAREHOUSE: s3://demo-iceberg/
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://minio:9000
CATALOG_URI: jdbc:postgresql://postgres-iceberg/iceberg_catalog
CATALOG_JDBC_USER: admin
CATALOG_JDBC_PASSWORD: password
At this point you are setup with Apache Iceberg and Trino, you can start running SQL with you favourite SQL IDE that supports Trino.
I am using the built in database connections tab in IntelliJ
, which only requires a URL (localhost:8080
) and a username (admin
)
Remember you can view the actual underlying structure and data of your tables in the bucket using the Minio UI at localhost:9000
Lets now setup our Kafka related components
We will need the following Components:
Kafka (we will run it in KRaft mode)
Kafka Connect
Kafka Connect UI (this is optional, but its nice)
Schema Registry (I’m gonna use avro in this example)
Kafka
3 Kafka Nodes, running in KRaft mode.
kafka1:
image: confluentinc/cp-kafka
container_name: kafka-iceberg-1
hostname: kafka-iceberg-1
networks:
iceberg_net:
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka-iceberg-1:29092,CONTROLLER://kafka-iceberg-1:29093,EXTERNAL://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-iceberg-1:29092,EXTERNAL://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-iceberg-1:29093,2@kafka-iceberg-2:29093,3@kafka-iceberg-3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka2:
image: confluentinc/cp-kafka
container_name: kafka-iceberg-2
hostname: kafka-iceberg-2
networks:
iceberg_net:
ports:
- "9093:9093"
environment:
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka-iceberg-2:29092,CONTROLLER://kafka-iceberg-2:29093,EXTERNAL://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-iceberg-2:29092,EXTERNAL://localhost:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-iceberg-1:29093,2@kafka-iceberg-2:29093,3@kafka-iceberg-3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka3:
image: confluentinc/cp-kafka
container_name: kafka-iceberg-3
hostname: kafka-iceberg-3
networks:
iceberg_net:
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka-iceberg-3:29092,CONTROLLER://kafka-iceberg-3:29093,EXTERNAL://0.0.0.0:9094'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-iceberg-3:29092,EXTERNAL://localhost:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-iceberg-1:29093,2@kafka-iceberg-2:29093,3@kafka-iceberg-3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
Kafka Connect
I’m extending the base cp-kafka-connect
image to install the iceberg-kafka-connect jar using confluent-hub
the docker file that kafka-connect
is using
FROM confluentinc/cp-kafka-connect:latest
ENV CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.4.11
kafka-connect:
build:
context: .
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-iceberg-1:29092,kafka-iceberg-2:29092,kafka-iceberg-3:29092
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: _connect_status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
networks:
iceberg_net:
Kafka Connect UI
connect-ui:
image: landoop/kafka-connect-ui
container_name: connect-ui
networks:
iceberg_net:
depends_on:
- kafka-connect
ports:
- "8000:8000"
environment:
CONNECT_URL: http://kafka-connect:8083
Schema Registry
schema-registry:
image: confluentinc/cp-schema-registry
hostname: schema-registry
restart: always
container_name: schema-registry-iceberg
networks:
iceberg_net:
depends_on:
- kafka1
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-iceberg-1:29092,PLAINTEXT://kafka-iceberg-2:29092,PLAINTEXT://kafka-iceberg-3:29092
SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
SCHEMA_REGISTRY_DEBUG: "true"
SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081"
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "full_transitive"
Lets create our topic that we will produce/consume data to/from.
I have kafka installed (brew install kafka
)
to easily get the CLI utilities available on my path
kafka-topics --bootstrap-server localhost:9092 --topic ethereum.mainnet.blocks --partitions 3 --replication-factor 1 --create
Lets make a basic app that produces some data to Kafka, have use AVRO
encoding.
package main
import (
"context"
"fmt"
"github.com/hamba/avro"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
"golang.org/x/crypto/sha3"
"math/rand"
"os"
"time"
)
const (
BootstrapServers = "localhost:9092"
SchemaRegistryUrl = "http://localhost:8081"
topic = "ethereum.mainnet.blocks"
)
type BlockHeader struct {
Number int64 `json:"number" avro:"number"`
Hash string `json:"hash" avro:"hash"`
ParentHash string `json:"parent_hash" avro:"parent_hash"`
GasUsed int64 `json:"gas_used" avro:"gas_used"`
Timestamp int64 `json:"timestamp" avro:"timestamp"`
}
func main() {
kafkaClient, err := CreateKafkaClient()
if err != nil {
panic(err)
}
schemaRegistryClient, err := NewSchemaRegistryClient()
if err != nil {
panic(err)
}
// register the schema and Create Serializer/Deserializer
serde, err := NewSerde[BlockHeader](schemaRegistryClient, "block_header.avsc", "ethereum.mainnet.blocks-value")
if err != nil {
panic(err)
}
currentBlockNum := int64(10000)
currentTimestamp := time.Now()
for i := 0; i < 1000000; i++ {
timestamp := currentTimestamp.Add(time.Duration(30) * time.Second)
block := GenerateBlock(currentBlockNum, timestamp)
currentBlockNum = block.Number
currentTimestamp = timestamp
kafkaClient.ProduceSync(context.Background(), &kgo.Record{
Topic: topic,
Key: []byte(fmt.Sprintf("%d", block.Number)),
Value: serde.MustEncode(block),
})
}
}
func CreateKafkaClient() (*kgo.Client, error) {
kafkaClient, err := kgo.NewClient(kgo.SeedBrokers(BootstrapServers))
if err != nil {
return nil, fmt.Errorf("unable to create kafka client %w", err)
}
return kafkaClient, nil
}
func NewSchemaRegistryClient() (*sr.Client, error) {
schemaRegistryClient, err := sr.NewClient(sr.URLs(SchemaRegistryUrl))
if err != nil {
return nil, fmt.Errorf("unable to create schema registry client %w", err)
}
return schemaRegistryClient, nil
}
func NewSerde[T any](srClient *sr.Client, schemaFilePath string, subject string) (*sr.Serde, error) {
schemaTextBytes, err := os.ReadFile(schemaFilePath)
if err != nil {
return nil, fmt.Errorf("unable to read schema file %w", err)
}
subjectSchema, err := srClient.CreateSchema(context.Background(), subject, sr.Schema{
Schema: string(schemaTextBytes),
Type: sr.TypeAvro,
})
if err != nil {
return nil, fmt.Errorf("unable to create schema %w", err)
}
avroSchema, err := avro.Parse(string(schemaTextBytes))
if err != nil {
return nil, fmt.Errorf("unable to parse schema %w", err)
}
var schemaType T
var serde sr.Serde
serde.Register(
subjectSchema.ID,
schemaType,
sr.EncodeFn(func(a any) ([]byte, error) {
return avro.Marshal(avroSchema, a)
}),
sr.DecodeFn(func(bytes []byte, a any) error {
return avro.Unmarshal(avroSchema, bytes, a)
}),
)
return &serde, nil
}
func GenerateBlock(blockNumber int64, timestamp time.Time) BlockHeader {
blockHash := sha3.New256()
blockHash.Write([]byte(fmt.Sprintf("%d", blockNumber)))
parentHash := sha3.New256()
parentHash.Write([]byte(fmt.Sprintf("%d", blockNumber-1)))
return BlockHeader{
Number: blockNumber + 1,
Hash: fmt.Sprintf("%x", blockHash.Sum(nil)),
ParentHash: fmt.Sprintf("%x", parentHash.Sum(nil)),
GasUsed: rand.Int63n(10000000),
Timestamp: timestamp.Unix(),
}
}
Inspecting the Data
Lets use kcat
(brew install kcat
)
to verify that our data is in the topic
kcat -b localhost:9092 -t \
ethereum.mainnet.blocks -C \
-f '\nKey (%K bytes): %k\t\n%s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Notice how we get weird byte strings like:
���51e978306f9b7075e44ba3f18c39d147a49fba6a36d9eda9bdaa4e50e24417df�fc25002675bcb112ca97f28f1ef1c8b38d487b68514d304e80ee16e3e98aca71������
This is the avro endcoded data, we need to tell kcat
how to decode it using schema-registry
by adding additional args -s value=avro -r http://localhost:8081
kcat -b localhost:9092 -t \
ethereum.mainnet.blocks -C \
-s value=avro -r http://localhost:8081 \
-f '\nKey (%K bytes): %k\t\n%s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
now we can see the messages value as json
Before we can start sinking some data from our topic, we need a table to write data to. If you haven’t already, create a schema. Im going to name my one blockchain
create schema iceberg.blockchain;
Lets create a table
create table iceberg.blockchain.ethereum_mainnet_blocks(
number BIGINT,
hash VARCHAR,
parent_hash VARCHAR,
gas_used BIGINT,
timestamp TIMESTAMP(6)
)
WITH (
partitioning = ARRAY['day(timestamp)'],
sorted_by = ARRAY['number']
);
partitioning = ARRAY[‘day(timestamp’]
to use a partition transform, since I want this table partitioned by the block date, not timestampsorted_by = ARRAY[‘number’]
will store the data in files sorted by number, this will make queries that useWHERE number = x
much more efficient, as the min/max value of columns are stored as metadata, query engines like trino, know which files inside a partition it needs to read, and can skip others.
Lets take a quick look our bucket to see what happened (http://localhost:9000
use the user/password set in docker compose for minio)
mint! we can see our blockchain
schema created as a folder
opening that, i can see our table now
at this point, since you haven’t written any data yet, you should only see a metadata
folder inside. once data is written a new folder called data
is created and you can see the actual Parquet data files being written, we will get to writing the data next.
The iceberg-kafka-connect
connector uses a control topic, to guarantee exactly-once semantics. by default it looks for a topic called control-iceberg
so lets create that.
kafka-topics --bootstrap-server localhost:9092 --topic control-iceberg --partitions 1 --replication-factor 3 --create
Lets now add the connector, Im going to use the Connect UI. open up localhost:8000
and click NEW
you should see the IcebergSinkConnector available to you
The Connector config:
{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"transforms.ConvertTimestamp.unix.precision": "seconds",
"iceberg.catalog.s3.secret-access-key": "password",
"iceberg.catalog.s3.endpoint": "http://minio:9000",
"topics": "ethereum.mainnet.blocks",
"tasks.max": "1",
"transforms.ConvertTimestamp.format": "yyyy-MM-dd HH:mm:ss",
"transforms": "ConvertTimestamp",
"transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"iceberg.control.commit.interval-ms": "20000",
"iceberg.catalog.client.region": "eu-west-1",
"iceberg.catalog.uri": "http://rest:8181",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"iceberg.tables": "blockchain.ethereum_mainnet_blocks",
"iceberg.catalog.warehouse": "s3://demo-iceberg/",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms.ConvertTimestamp.target.type": "Timestamp",
"transforms.ConvertTimestamp.field": "timestamp",
"iceberg.catalog.type": "rest",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"iceberg.catalog.s3.access-key-id": "admin"
}
timestamp
field is a unix timestamp in seconds. I want to convert that to a timestamp before its written to the table with this transform.
The connector will start to write data, and start committing that data to the table
Lets check the kafka-connect
logs to make sure things aren’t broke
CommitReport
means that some new written data is now committed to the table and we can query it.
select * from iceberg.blockchain.ethereum_mainnet_blocks
;
choice, data is queryable as an iceberg table from Trino
lets take a look at what the underlying table actually looks like:
Sweet, there are the day partitioned folders, each containing data file(s). this means that I can query using the timestamp column, but Trino will under the hood use the date as a means of selecting the correct partitions to look for
What Next:
converting hive tables to iceberg tables
CDC into iceberg
In case anyone needs it, I had to change the `iceberg.properties` file to:
```
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://rest:8181
iceberg.rest-catalog.warehouse=s3://demo-iceberg/
iceberg.file-format=PARQUET
# https://trino.io/docs/current/object-storage/file-system-s3.html
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=eu-south-2
s3.path-style-access=true
#hive.s3.aws-access-key=admin
#hive.s3.aws-secret-key=password
```