Back to Dagster

Upgrading to 0.3.0

python_modules/dagster/docs/sections/learn/guides/028_to_030_upgrade_guide.md

2019050912.9 KB
Original Source

Upgrading to 0.3.0

This guide is a step-by-step guide for upgrading from dagster 0.2.x to 0.3.0. This represents a substantial upgrade in capabilities but also some breaking API changes. We'll detail them, provide context and reasoning, and instructions about how to upgrade.

Required API Changes

1. No more top level config subpackage.

Error:

from dagster import (
ImportError: cannot import name 'config'

We have eliminated the public-facing "config" namespace. (You use raw dictionaries instead of a parallel, typed API to configure pipeline runs).

Fix: Simply eliminate the include. You'll run into related errors later.

2. No more dagster.sqlalchemy and dagster.pandas submodules.

Error:

E   ModuleNotFoundError: No module named 'dagster.sqlalchemy'

We have moved pandas and sqlalchemy code into their own separate modules (dagster-pandas and dagster-sqlalchemy). This makes the core dagster library have less dependencies.

Fix: Instead of importing dagster.sqlalchemy you need to pip install dagster-sqlalchemy, add it to your virtual env, and then include dagster_sqlalchemy instead.

3. ConfigDefinition no longer exists.

Error:

ImportError: cannot import name 'ConfigDefinition'

We have eliminated a separate notion of a ConfigDefinition. Instead, we realized the user provided config in a solid, resource, or context is just a Field that you would use to build a Dict or Selector. So replace ConfigDefinition with Field. (Generally config_def=ConfigDefinition is now config_field=Field)

Before:

py
"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_def=ConfigDefinition(
        # ...
    )

After:

py
"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_field=Field(
       # ...
    )

4. New, Simpler Dagster Type Definition API.

Error:

    description='''This represents a path to a file on disk'''
E   TypeError: __init__() got multiple values for argument 'python_type'

Another Error:

E   dagster.check.ParameterCheckError: Param "klass" was supposed to be a type. Got <dagster.core.types.runtime.PythonObjectType object at 0x11e4fbf60> instead of type <class 'dagster.core.types.runtime.PythonObjectType'>

There are now two different type creation APIs. One for creating new types, and one for annotating existing types that you include.

Examples:

py
@dagster_type(description='This represents a path to a file on disk')
class PathToFile(str):
    pass

S3FileHandle = as_dagster_type(
    namedtuple('S3FileHandle', 'bucket path'),
    description='''
upload_header_to_s3 and upload_service_line_to_s3 both result in files
being uploaded to s3. Hence the "output" of those two solids is a handle
to a file. The following stages take those as their inputs to create
redshift tables out of them.

Properties:
    - bucket: String
    - path: String
        '''
)

Note you can use S3FileHandle and PathToFile as if they were just "normal types" as well.

5. ConfigDictionary --> NamedDict or Dict

We have a much less verbose API for building configuration schema:

Error:

E   AttributeError: module 'dagster.core.types' has no attribute 'ConfigDictionary

First, we are discouraging the use of the types namespace. Instead just from dagster import Dict (or whatever class directly). Second, ConfigDictionary is now just NamedDict. If the name of the type wasn't particularily relevant you can also eliminate that and just use Dict. Third, you do not have to name it. The net result is much nicer:

Before:

py
types.ConfigDictionary(
    'DefaultContextConfig',
    {
        'data_source_run_id' : Field(types.String, description='''
            This is a run id generated by the caller of this pipeline. Right
            now this is required to tie a single run id to multiple executions
            of the same pipeline.
        '''),
        'conf' : Field(types.Any),
    },
)

After:

py
Dict({
    'data_source_run_id' : Field(String, description='''
        This is a run id generated by the caller of this pipeline. Right
        now this is required to tie a single run id to multiple executions
        of the same pipeline.
    '''),
    'conf' : Field(Any),
})

This is a fairly mechanical transition.

6. define_stub_solid no longer in top-level dagster

This is now an internal utility function. If you really, really need it:

from dagster.core.utility_solids import define_stub_solid

7. Environments are raw dictionaries rather than config.* classes

Per update 1 config classes no longer are public or used in the execute_pipeline family of APIs. Use raw dictionaries instead. They should be shaped exactly like the yaml files.

Before:

py
    environment = config.Environment(
        context=config.Context(
            name='unittest',
            config={
                'data_source_run_id': str(uuid.uuid4()),
                'conf': CONF,
                'log_level': 'ERROR',
                'cleanup_files': False,
            }
        ),
        solids={
            'unzip_file': config.Solid({
                'zipped_file': ZIP_FILE_PATH
            }),
        },
    )

After:

py
    environment = {
        'context':{
            'unittest' : {
                'config' : {
                    'data_source_run_id': str(uuid.uuid4()),
                    'conf': CONF,
                    'log_level': 'ERROR',
                    'cleanup_files': False,
                }
            }
        },
        'solids': {
            'unzip_file': {
                'config' : {
                    'zipped_file': ZIP_FILE_PATH,
                }
            }
        }
    }

While providing less guarantees within the python type system, this API results in very high quality error checking and messaging from the dagster config schema.

8. New testing APIs

Error:

 AttributeError: type object 'PipelineDefinition' has no attribute 'create_sub_pipeline'

or

AttributeError: type object 'PipelineDefinition' has no attribute 'create_single_solid_pipeline'

The creation of "sub" and "single_solid" pipelines was awkward and error-prone. Instead we have the new functions execute_solid and execute_solids. You can now execute a single solid with a single function call.

Before:

py
    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_fileload_pipeline(),
        'unzip_file',
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    assert os.path.exists(
        result.result_for_solid('unzip_file').transformed_value())

After:

py
    solid_result = execute_solid(
        define_fileload_pipeline(),
        'unzip_file',
        environment=environment
    )

    assert solid_result.success
    assert os.path.exists(solid_result.transformed_value())

Before (with stubbed inputs):

py
    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_fileload_pipeline(),
        'split_headers_and_service_lines',
        {
            'split_headers_and_service_lines': {
                'unzipped_file':
                define_stub_solid('unzipped_path_value', unzipped_path)
            }
        },
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

After (with stubbed inputs):

py
    solid_result = execute_solid(
        define_fileload_pipeline(),
        'split_headers_and_service_lines',
        inputs={
            'unzipped_file': unzipped_path,
        },
        environment=environment,
    )

    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

Before (subset execution):

py
    pipeline = PipelineDefinition.create_sub_pipeline(
        define_fileload_pipeline(),
        ['unzip_file'],
        ['split_headers_and_service_lines'],
        {},
    )

    result = execute_pipeline(pipeline, environment)


    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    snapshot_check_results(snapshot, solid_result)

After (subset execution):

py
    result_dict = execute_solids(
        define_pipeline(),
        ['unzip_file', 'split_headers_and_service_lines'],
        environment=environment,
    )

    snapshot_check_results(snapshot, result_dict['split_headers_and_service_lines'])

9. Execution Context Lifecycle Changes

Error:

AttributeError: 'ExecutionContext' object has no attribute 'value'

This is officially the most difficult change, conceptually. We changed the system so that the ExecutionContext passed around to your solids (now RuntimeExecutionContext) is constructed by the system rather than the user. The ExecutionContext object the user creates can be thought of as RuntimeExecutionContextParams. We opted against that name because it was excessively verbose.

Before:

py
    with context.value('data_source_run_id', data_source_run_id),\
        context.value('data_source', 'new_data'),\
        context.value('pipeline_run_id', pipeline_run_id):

        yield ExecutionContext(
            loggers=[define_colored_console_logger('dagster', log_level)],
            resources=resources
        )

After:

py
    # because you no longer need the with clause here you can just return
    # the ExecutionContext object directly
    return ExecutionContext(
        loggers=[define_colored_console_logger('dagster', log_level)],
        resources=resources,
        tags={
            'data_source_run_id': data_source_run_id,
            'data_source': 'new_data',
            'pipeline_run_id': pipeline_run_id,
        },
    )

  1. Non-null by default

Error:

E   dagster.core.errors.DagsterTypeError: Solid solid_name input input_name received value None which does not pass the typecheck for Dagster type PandasDataFrame. Step solid_name.transform

You have encountered a type error. Likely it is because in 0.2.8, types could accept None by default, and this is no longer true in 0.3.0. You have to opt into accepting nulls.

Before:

py
@solid(outputs=[OutputDefinition(dagster_type=dagster_pd.DataFrame)])
def return_none(context):
    return None # None no longer allowed, would break at runtime

After:

py
@solid(outputs=[OutputDefinition(dagster_type=Nullable(dagster_pd.DataFrame))])
def return_none(context):
    return None # Because of Nullable wrapper, this is ok

11. Solid name uniqueness per-repository enforce by default

Error:

 dagster.core.errors.DagsterInvalidDefinitionError: Trying to add duplicate solid def solid_one in pipeline_two, Already saw in pipeline_one.

We enforce that solid names are unique per-repository by default. This is to setup a future where you can look up a solid by name in a repository and view that as an independent entity, among other things. An example of a feature this enables is the ability to index back into all the places where that solid is used.

As a temporary measure, we have added an enforce_uniqueness boolean flag to RepositoryDefinition construction. However, this will not be supported forever as we will be building features that rely on that property.

Fix is:

py

    return RepositoryDefinition(
        name='repo_name',
        enforce_uniqueness=True, # add this flag
        pipeline_dict={...}
    )

The preferred option is to make solid names unique. Prefixing solids in offending pipelines with the pipeline name would be a straightforward approach to solve this quickly. This would also guarantee that a later change would not trigger this error again.

12. Context is now a top-level argument to solids

This is not a breaking change, but it will improve developer ergonomics and is relatively straightforward to do.

Before:

py
    @solid
    def a_solid(info):
        info.context.info('something')
        info.context.resources.a_resource.do_something()

After:

py
    @solid
    def a_solid(context):
        context.log.info('something') # log in the name is more clear
        context.resources.a_resource.do_something() # resources available top-level
        context.run_id # run_id available as top level property
        # no longer info.context.config as it was confusing
        # when switching between resources, contexts, and solids
        context.solid_config

The ability to refer to info.context will go away fairly (there is a legacy adapter class to enable backwards compatability and we do not want it to be immortal). We also want to enforce that the name of the first variable is context. We are only allowing info temporarily.