flink-python/docs/user_guide/state_fault_tolerance.rst
.. ################################################################################ Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
################################################################################
This section covers state management and fault tolerance in PyFlink applications.
.. note::
This documentation is under development. For now, please refer to the
:doc:DataStream API Reference </reference/pyflink.datastream/index> for detailed state management documentation.
State is a fundamental concept in stream processing that allows you to maintain data across multiple events. PyFlink provides several types of state:
.. code-block:: python
from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import RuntimeContext, MapFunction from pyflink.datastream.state import ValueStateDescriptor
class CountFunction(MapFunction): def open(self, runtime_context: RuntimeContext): # Define state descriptor state_desc = ValueStateDescriptor('count', Types.LONG()) self.count_state = runtime_context.get_state(state_desc)
def map(self, value):
# Get current count
current_count = self.count_state.value()
if current_count is None:
current_count = 0
# Increment count
current_count += 1
self.count_state.update(current_count)
return value, current_count
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection([1, 2, 3, 1, 2, 1])
ds = ds.map(lambda x: (x, 1))
.key_by(lambda x: x[0])
.map(CountFunction())
ds.print()