Back to Automq

AutoMQ Table Topic Demonstration

docker/table_topic/spark_iceberg/notebooks/TableTopic - Getting Started.ipynb

1.6.67.8 KB
Original Source

AutoMQ Table Topic Demonstration

This notebook demonstrates the core capabilities of AutoMQ Table Topic, including automatic table creation, Upsert mode for data synchronization, and data partitioning. The workflow creates a topic with Upsert and partitioning enabled, sends one Insert (I), Update (U), and Delete (D) message, and queries the Iceberg table after each operation.

1. Import Libraries and Define Helper Functions

Import necessary libraries and define helper functions for key operations such as creating topics, producing messages, and querying Iceberg tables.

python
import uuid
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.cimpl import KafkaException, KafkaError
from datetime import datetime, timezone
from faker import Faker
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

# Configuration constants
KAFKA_BOOTSTRAP_SERVERS = 'automq:9092'
SCHEMA_REGISTRY_URL = 'http://schema-registry:8081'
TOPIC_NAME = 'web_page_view_events'

# Initialize AdminClient and SchemaRegistryClient
admin_client_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
admin_client = AdminClient(admin_client_conf)
schema_registry_conf = {'url': SCHEMA_REGISTRY_URL}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Initialize SparkSession
spark = SparkSession.builder.appName("AutoMQ Table Topic Demo").getOrCreate()
fake = Faker()

# Helper function: Create a Kafka Topic
def create_topic(topic_name, num_partitions=1, replication_factor=1, config=None):
    if config is None:
        config = {}
    topics = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor, config=config)]
    futures = admin_client.create_topics(topics, operation_timeout=30)
    for topic, future in futures.items():
        try:
            future.result()
            print(f"Topic '{topic}' created successfully.")
        except KafkaException as e:
            error = e.args[0]
            if error.code() == KafkaError.TOPIC_ALREADY_EXISTS:
                print(f"Topic '{topic}' already exists.")
            else:
                raise Exception(f"Failed to create topic '{topic}': {error.str()}")

# Helper function: Create a Producer
def create_producer():
    producer_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
    return Producer(producer_conf)

# Helper function: Produce events to Kafka
def produce_events(producer, topic_name, events_data, avro_serializer, string_serializer):
    for event in events_data:
        try:
            producer.produce(
                topic=topic_name,
                key=string_serializer(event.event_id),
                value=avro_serializer(event, SerializationContext(topic_name, MessageField.VALUE)),
                on_delivery=delivery_report
            )
        except Exception as e:
            print(f"Failed to produce event {event.event_id}: {e}")
        producer.poll(0)
    producer.flush()
    print(f"Successfully produced {len(events_data)} event(s) to {topic_name}.")

# Delivery report callback for produced messages
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
        return
    print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}")

2. Create Topic with Upsert and Partitioning

Create a Kafka topic with Table Topic enabled, configured for Upsert mode and partitioning. The topic uses event_id as the primary key, ops as the operation field, and partitions data by bucket(page_url, 5) and hour(timestamp).

python
# Define Avro Schema with operation support
schema_str = """
{
  "type": "record",
  "name": "PageViewEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }},
    {"name": "page_url", "type": "string"},
    {"name": "ip_address", "type": "string"},
    {"name": "user_agent", "type": "string"},
    {"name": "ops", "type": "string"}
  ]
}
"""

# Define PageViewEvent class
class PageViewEvent:
    def __init__(self, event_id, user_id, timestamp, page_url, ip_address, user_agent, ops):
        self.event_id = event_id
        self.user_id = user_id
        self.timestamp = timestamp
        self.page_url = page_url
        self.ip_address = ip_address
        self.user_agent = user_agent
        self.ops = ops

# Serialization function for events
def event_to_dict(event, ctx):
    return {
        "event_id": event.event_id,
        "user_id": event.user_id,
        "timestamp": event.timestamp,
        "page_url": event.page_url,
        "ip_address": event.ip_address,
        "user_agent": event.user_agent,
        "ops": event.ops
    }

# Create topic with Upsert and partitioning configurations
topic_config = {
    'automq.table.topic.enable': 'true',
    'automq.table.topic.commit.interval.ms': '2000',
    'automq.table.topic.schema.type': 'schema',
    'automq.table.topic.upsert.enable': 'true',
    'automq.table.topic.id.columns': '[event_id]',
    'automq.table.topic.cdc.field': 'ops',
    'automq.table.topic.partition.by': '[bucket(page_url, 5), hour(timestamp)]'
}
create_topic(TOPIC_NAME, config=topic_config)

# Initialize serializers and producer
avro_serializer = AvroSerializer(schema_registry_client, schema_str, event_to_dict)
string_serializer = StringSerializer('utf_8')
producer = create_producer()

3. Insert Operation

Produce an Insert (I) event to the topic.

python
event_id = str(uuid.uuid4())
current_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000)
insert_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp, fake.uri_path(), fake.ipv4(), fake.user_agent(), "I")]
produce_events(producer, TOPIC_NAME, insert_event, avro_serializer, string_serializer)

4. Query After Insert

Query the Iceberg table to verify the inserted record.

python
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

5. Update Operation

Produce an Update (U) event for the same event_id to update the record.

python
update_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 1000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "U")]
produce_events(producer, TOPIC_NAME, update_event, avro_serializer, string_serializer)

6. Query After Update

Query the Iceberg table to verify the updated record.

python
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

7. Delete Operation

Produce a Delete (D) event for the same event_id to remove the record.

python
delete_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 2000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "D")]
produce_events(producer, TOPIC_NAME, delete_event, avro_serializer, string_serializer)

8. Query After Delete

Query the Iceberg table to verify that the record has been removed.

python
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

9. Cleanup

Delete the topic and drop the Iceberg table after the demonstration.

python
admin_client.delete_topics([TOPIC_NAME])
spark.sql(f"DROP TABLE default.{TOPIC_NAME}")
print(f"Topic '{TOPIC_NAME}' and Iceberg table deleted.")