docs/2.developers/4.user-guide/60.deployment/55.persistence.md
The pathway framework helps to build computational pipelines easily and flexibly. Under the course of development, one may probably need to save the state of the computation. Here is where persistence comes to help.
For example, persistence may be needed for one or several of the following reasons:
Suppose that we're solving a simple wordcount task. The input consists of a directory of CSV files, which needs to be polled constantly. Each file consists of a header and several lines, each containing a word.
For the output there should be a file containing a sequence of JSON lines, each containing fields "word" and "count", denoting the word and the number of its' occurrences.
Having that given, the problem can be solved with the following small Pathway program:
import pathway as pw
class InputSchema(pw.Schema):
word: str
words = pw.io.csv.read("inputs/", schema=InputSchema)
word_counts = words.groupby(words.word).reduce(words.word, count=pw.reducers.count())
pw.io.jsonlines.write(word_counts, "result.jsonlines")
pw.run()
This program is not persistent. That means that when being restarted, it will start the computation from the beginning, scanning the files one-by-one, processing the word counts, and outputting them.
However, it can easily be made into persistent one.
Then only essential block that Pathway needs is the persistent storage configuration. Consider that we store the intermediary state in the local directory called state. Then, we obtain the following persistent config:
persistence_backend = pw.persistence.Backend.filesystem("./state/")
persistence_config = pw.persistence.Config(persistence_backend)
Now, the persistence config needs to be passed into the pw.run method. It can be done as follows:
pw.run(persistence_config=persistence_config)
With these modifications the program becomes persistent. That means, if being restarted it will continue the computations and the output from the place it stopped last time. Now let's understand the main concepts for persistence setup in more detail.
Please note that there is also a tutorial with step-by-step explanation available.
First, Pathway needs storage, which can persist the internal state, along with the checkpoint and metainformation. The persistence mechanism combines internal state serialization with a write-ahead logging mechanism. It can store the data either locally or in S3.
The storage can be configured with the usage of pw.persistence.Config class instance, which should be passed as an argument to pw.run command, executing the data pipeline. The persistence config accumulates several settings:
Both metadata and snapshot storages are configurable with the class pw.persistence.Backend, that has two methods for S3 and Filesystem storage configuration.
The config itself is created via constructor, where it accepts backend argument of the type pw.persistence.Backend, and optionally the snapshot interval given in milliseconds.
This, however, is not the only thing needed for persistence to work, and it moves us to the unique names.
To persist certain input sources, Pathway uses the name parameter. This identifier denotes the factual data source, so it is also expected not to be changed over the reruns of the program.
The motivation behind the unique names usage is that the data source can be changed as much as it's needed for the computations as long as it contains the data with the same schema. For example, it is possible that:
All in all, the variety of changes can be huge. Still, by using the same name the engine knows that the data still corresponds to a certain table.
These IDs can be assigned in two different ways. The first one is automatic generation. It will assign the IDs to the sources based on the order in which they are added. For example, if the program first reads a dataset from the CSV and then it reads the stream of events from Kafka, it will have two unique names automatically generated, where the first one will point to the dataset and the second one will point to the stream of events. Please note that this approach is fine in case the code is not going to be updated. Otherwise, the sources can be changed which will incur the generation of different unique names, not corresponding to the old ones.
The second way can be a bit more difficult, but it allows more flexibility: the unique names can be assigned manually in the input connector. To do so, one needs to specify the string parameter name. For example, the word count program above relied on the automatic name generation. However, it would also be possible to specify it explicitly this way:
words = pw.io.csv.read("inputs/", schema=InputSchema, name="words_source")
The framework currently maintains a snapshot of the internal state, along with the metadata required for recovery. When a program starts, Pathway first looks for persisted checkpoints. It then loads the snapshot data and the information about the next unread offsets in the data sources. Due to this, there is a requirement for the data sources to be persistent, so that if the program is terminated for whatever reason, the next entries can be re-read on restart. Please note that most of the data sources comply with this requirement: that is, S3, Kafka topic, or filesystem entries can easily be re-read if the program restarts.
The guarantee, given by Pathway's persistent recovery, is at-least-once delivery. Internally, the inputs are split into smaller transactional batches. If the program is interrupted during execution, the outputs corresponding to the unclosed transactional batches may duplicate. However, it shouldn't be the case for graceful program termination, where exactly-once semantic can be guaranteed.