examples/notebooks/tutorials/custom_reducers.ipynb
<a href="https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/notebooks/tutorials/custom_reducers.ipynb" target="_parent"></a>
In the cell below, we install Pathway into a Python 3.10+ Linux runtime.
If you are running in Google Colab, please run the colab notebook (Ctrl+F9), disregarding the 'not authored by Google' warning.
The installation and loading time is less than 1 minute.
%%capture --no-display
!pip install --prefer-binary pathway
import logging
logging.basicConfig(level=logging.CRITICAL)
Pathway supports natively aggregation using a wide range of reducers, e.g., sum, count, or max. However, those might not cover all the necessary ways of aggregating values. In this tutorial, you learn how to write reducers implementing custom logic.
For example, let's implement a custom stateful stdev reducer that computes the standard deviation.
import pathway as pw
# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")
SHOW_DEBUG = False
class StdDevAccumulator(pw.BaseCustomAccumulator):
def __init__(self, cnt, sum, sum_sq):
self.cnt = cnt
self.sum = sum
self.sum_sq = sum_sq
@classmethod
def from_row(cls, row):
[val] = row
if SHOW_DEBUG:
print("from_row()")
return cls(1, val, val**2)
def update(self, other):
self.cnt += other.cnt
self.sum += other.sum
self.sum_sq += other.sum_sq
if SHOW_DEBUG:
print("update()")
def compute_result(self) -> float:
mean = self.sum / self.cnt
mean_sq = self.sum_sq / self.cnt
if SHOW_DEBUG:
print("compute_result()")
return mean_sq - mean**2
stddev = pw.reducers.udf_reducer(StdDevAccumulator)
Above, the pw.BaseCustomAccumulator class is used as a base for the StdDevAccumulator, which describes the logic of the underlying accumulator. The accumulator class requires a few methods:
from_row, which constructs an accumulator from the values of a single row of a table (here, a single value since our reducer applies to a single column),update, which updates one accumulator by another accumulator,compute_result, which produces the output based on the accumulator state,retract, is an optional method, which processes negative updates,neutral, is an optional method, which returns state corresponding to consuming 0 rows.Now, let's see the reducer in action.
temperature_data = pw.debug.table_from_markdown(
"""
date | temperature
2023-06-06 | 28.0
2023-06-07 | 23.1
2023-06-08 | 24.5
2023-06-09 | 26.0
2023-06-10 | 28.3
2023-06-11 | 25.7
"""
)
temperature_statistics = temperature_data.reduce(
avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)
)
pw.debug.compute_and_print(temperature_statistics)
However, with this logic, our reducer is not smartly processing negative updates: it starts the computation from scratch whenever a negative update is encountered. You can see this in action by enabling debug information and processing table where row removal happens. Let's insert several values at time 0 and then remove one already inserted value and add another at time 2.
SHOW_DEBUG = True
temperature_data_with_updates = pw.debug.table_from_markdown(
"""
date | temperature | __time__ | __diff__
2023-06-06 | 28.0 | 0 | 1
2023-06-07 | 23.1 | 0 | 1
2023-06-08 | 24.5 | 0 | 1
2023-06-09 | 26.0 | 0 | 1
2023-06-10 | 28.3 | 0 | 1
2023-06-11 | 25.7 | 0 | 1
2023-06-11 | 25.7 | 2 | -1
2023-06-11 | 25.9 | 2 | 1
"""
)
temperature_statistics_with_updates = temperature_data_with_updates.reduce(
avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)
)
pw.debug.compute_and_print(temperature_statistics_with_updates)
It can be alleviated by extending our reducer and providing a method for processing negative updates.
class ImprovedStdDevAccumulator(StdDevAccumulator):
def retract(self, other):
self.cnt -= other.cnt
self.sum -= other.sum
self.sum_sq -= other.sum_sq
if SHOW_DEBUG:
print("retract()")
improved_stddev = pw.reducers.udf_reducer(ImprovedStdDevAccumulator)
And now you can test the improved reducer in action.
temperature_statistics_improved = temperature_data_with_updates.reduce(
avg=pw.reducers.avg(pw.this.temperature),
stddev=improved_stddev(pw.this.temperature),
)
pw.debug.compute_and_print(temperature_statistics_improved)
In the example above, 10x calls to update() and 12x calls to from_row() are replaced with 6x calls to update(), 1x call to retract() and 8x calls to from_row().
This comes from the fact that former reducer:
from_row() for each row of the table, wrapping each single value into separate StdDevAccumulator object,update() for each row of the table except the first consumed,While the latter reducer aggregated the table at time 0 in the same way as former one, but processed the update differently:
from_row() callsretract() and once update().