Back to Yugabyte Db

Apache Beam

docs/content/stable/integrations/apache-beam.md

2026.1.0.0-b252.8 KB
Original Source

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam's supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.

Prerequisite

To use Apache Beam, ensure that you have YugabyteDB up and running. Download and install YugabyteDB by following the steps in Quick start.

Setup

To run Apache Beam with YugabyteDB, do the following:

  1. Create a folder apache-beam-test as follows:

    sh
    mkdir apache-beam-test && cd apache-beam-test
    
  2. Create a python virtual environment and activate it as follows:

    python
    python3 -m venv myenv
    source myenv/bin/activate
    
  3. Install the latest Apache Beam Python SDK from Pypi using the following command:

    python
    pip install beam-nuggets
    
  4. Create a python file, democode.py and add the following code to it:

    python
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from beam_nuggets.io import relational_db
    records = [
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
        {'name': 'Mar', 'num': 3},
        {'name': 'Apr', 'num': 4},
        {'name': 'May', 'num': 5},
    ]
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql',  #postgresql+pg8000
        host='127.0.0.1',
        port=5433,
        username='yugabyte',
        password='yugabyte',
        database='yugabyte',
        create_if_missing=True  # create the database if not there
    )
    table_config = relational_db.TableConfiguration(
        name='months_col01',
        create_if_missing=True,
        primary_key_columns=['num']
    )
    with beam.Pipeline(options=PipelineOptions()) as p:
        months = p | "Reading month records" >> beam.Create(records)
        months | 'Writing to DB table' >> relational_db.Write(
            source_config=source_config,
            table_config=table_config
        )
    if __name__ == "__main__":
        print('demo code ran successful')
    
  5. Run democode.py as follows:

    python
    python democode.py
    
    output
    demo code ran successful
    
  6. You can also verify the changes from a ysql shell as follows:

    sql
    select * from month_col01;
    

    You can see the following output:

    output
     num | name
    -----+------
       5 | May
       1 | Jan
       4 | Apr
       2 | Feb
       3 | Mar
    (5 rows)