docker/table_topic/spark_iceberg/notebooks/TableTopic - Getting Started.ipynb
This notebook demonstrates the core capabilities of AutoMQ Table Topic, including automatic table creation, Upsert mode for data synchronization, and data partitioning. The workflow creates a topic with Upsert and partitioning enabled, sends one Insert (I), Update (U), and Delete (D) message, and queries the Iceberg table after each operation.
Import necessary libraries and define helper functions for key operations such as creating topics, producing messages, and querying Iceberg tables.
import uuid
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.cimpl import KafkaException, KafkaError
from datetime import datetime, timezone
from faker import Faker
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
# Configuration constants
KAFKA_BOOTSTRAP_SERVERS = 'automq:9092'
SCHEMA_REGISTRY_URL = 'http://schema-registry:8081'
TOPIC_NAME = 'web_page_view_events'
# Initialize AdminClient and SchemaRegistryClient
admin_client_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
admin_client = AdminClient(admin_client_conf)
schema_registry_conf = {'url': SCHEMA_REGISTRY_URL}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Initialize SparkSession
spark = SparkSession.builder.appName("AutoMQ Table Topic Demo").getOrCreate()
fake = Faker()
# Helper function: Create a Kafka Topic
def create_topic(topic_name, num_partitions=1, replication_factor=1, config=None):
if config is None:
config = {}
topics = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor, config=config)]
futures = admin_client.create_topics(topics, operation_timeout=30)
for topic, future in futures.items():
try:
future.result()
print(f"Topic '{topic}' created successfully.")
except KafkaException as e:
error = e.args[0]
if error.code() == KafkaError.TOPIC_ALREADY_EXISTS:
print(f"Topic '{topic}' already exists.")
else:
raise Exception(f"Failed to create topic '{topic}': {error.str()}")
# Helper function: Create a Producer
def create_producer():
producer_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
return Producer(producer_conf)
# Helper function: Produce events to Kafka
def produce_events(producer, topic_name, events_data, avro_serializer, string_serializer):
for event in events_data:
try:
producer.produce(
topic=topic_name,
key=string_serializer(event.event_id),
value=avro_serializer(event, SerializationContext(topic_name, MessageField.VALUE)),
on_delivery=delivery_report
)
except Exception as e:
print(f"Failed to produce event {event.event_id}: {e}")
producer.poll(0)
producer.flush()
print(f"Successfully produced {len(events_data)} event(s) to {topic_name}.")
# Delivery report callback for produced messages
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
return
print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}")
Create a Kafka topic with Table Topic enabled, configured for Upsert mode and partitioning. The topic uses event_id as the primary key, ops as the operation field, and partitions data by bucket(page_url, 5) and hour(timestamp).
# Define Avro Schema with operation support
schema_str = """
{
"type": "record",
"name": "PageViewEvent",
"namespace": "com.example.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }},
{"name": "page_url", "type": "string"},
{"name": "ip_address", "type": "string"},
{"name": "user_agent", "type": "string"},
{"name": "ops", "type": "string"}
]
}
"""
# Define PageViewEvent class
class PageViewEvent:
def __init__(self, event_id, user_id, timestamp, page_url, ip_address, user_agent, ops):
self.event_id = event_id
self.user_id = user_id
self.timestamp = timestamp
self.page_url = page_url
self.ip_address = ip_address
self.user_agent = user_agent
self.ops = ops
# Serialization function for events
def event_to_dict(event, ctx):
return {
"event_id": event.event_id,
"user_id": event.user_id,
"timestamp": event.timestamp,
"page_url": event.page_url,
"ip_address": event.ip_address,
"user_agent": event.user_agent,
"ops": event.ops
}
# Create topic with Upsert and partitioning configurations
topic_config = {
'automq.table.topic.enable': 'true',
'automq.table.topic.commit.interval.ms': '2000',
'automq.table.topic.schema.type': 'schema',
'automq.table.topic.upsert.enable': 'true',
'automq.table.topic.id.columns': '[event_id]',
'automq.table.topic.cdc.field': 'ops',
'automq.table.topic.partition.by': '[bucket(page_url, 5), hour(timestamp)]'
}
create_topic(TOPIC_NAME, config=topic_config)
# Initialize serializers and producer
avro_serializer = AvroSerializer(schema_registry_client, schema_str, event_to_dict)
string_serializer = StringSerializer('utf_8')
producer = create_producer()
Produce an Insert (I) event to the topic.
event_id = str(uuid.uuid4())
current_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000)
insert_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp, fake.uri_path(), fake.ipv4(), fake.user_agent(), "I")]
produce_events(producer, TOPIC_NAME, insert_event, avro_serializer, string_serializer)
Query the Iceberg table to verify the inserted record.
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()
spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)
Produce an Update (U) event for the same event_id to update the record.
update_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 1000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "U")]
produce_events(producer, TOPIC_NAME, update_event, avro_serializer, string_serializer)
Query the Iceberg table to verify the updated record.
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()
spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)
Produce a Delete (D) event for the same event_id to remove the record.
delete_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 2000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "D")]
produce_events(producer, TOPIC_NAME, delete_event, avro_serializer, string_serializer)
Query the Iceberg table to verify that the record has been removed.
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()
spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)
Delete the topic and drop the Iceberg table after the demonstration.
admin_client.delete_topics([TOPIC_NAME])
spark.sql(f"DROP TABLE default.{TOPIC_NAME}")
print(f"Topic '{TOPIC_NAME}' and Iceberg table deleted.")