python/examples/advanced/Transaction_Examples.ipynb
Transactions allow several profiles to be commited to WhyLabs as a group. Let's start with some setup.
!pip install whylogs
import whylogs as why
from whylabs_client.api.transactions_api import TransactionsApi
from whylogs.core.schema import DatasetSchema
from whylogs.core.segmentation_partition import segment_on_column
from whylogs.api.writer.whylabs import WhyLabsWriter
from whylogs.api.writer.whylabs_transaction_writer import WhyLabsTransactionWrirter
import os
from uuid import uuid4
from whylogs.datasets import Ecommerce
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
os.environ["WHYLABS_DEFAULT_ORG_ID"] = "org-XXX"
os.environ["WHYLABS_DEFAULT_DATASET_ID"] = "model-XXX"
os.environ["WHYLABS_API_KEY"] = "XXXX:org-XXX"
dataset = Ecommerce()
daily_batches = dataset.get_inference_data(number_batches=20)
list_daily_batches = list(daily_batches)
columns = ['product','sales_last_week','market_price','rating','category','output_discount','output_prediction','output_score']
df = list_daily_batches[0].data[columns]
df.head()
why.init(force_local=True)
writer = WhyLabsWriter()
WhyLabsWriter::start_transaction() signals the start of a transaction. Profiles sent to WhyLabs with WhyLabsWriter::write() during the transaction are uploaded to WhyLabs immediately, but won't be processed until WhyLabsWriter::commit_transaction() is called.
transaction_id = writer.start_transaction()
print(f"Started transaction {transaction_id}")
for i in range(5):
batch_df = list_daily_batches[i].data[columns]
profile = why.log(batch_df)
timestamp = datetime.now(tz=timezone.utc) - timedelta(days=i+1)
profile.set_dataset_timestamp(timestamp)
status, id = writer.write(profile)
print(status, id)
writer.commit_transaction()
print("Commiting transaction")
The WhyLabsTransactionWriter can be used as a context manager to simplify transaction error handling and ensure commit_transaction() is called.
timestamp = datetime.now(tz=timezone.utc) - timedelta(days=2)
timestamp
try:
with WhyLabsTransactionWriter() as writer:
print("Started transaction")
for i in range(5):
batch_df = list_daily_batches[i].data[columns]
profile = why.log(df)
profile.set_dataset_timestamp(timestamp)
status, id = writer.write(profile)
print(status, id)
except Exception:
print("Transaction failed")
print("Committed transaction")
If a write() call during the transaction fails (returns a False status), the transaction's commit will fail raising an exception.
Each segment in a segmneted profile get uploaded to WhyLabs in a separate S3 interaction. Segmented profiles can be sent as a transaction so that all the segments are committed to WhyLabs at once. In this case, the status returned from WhyLabsWriter::write() is the logical and of the statuses of each segment, and it returns a list of all the segmented ids.
schema = DatasetSchema(segments=segment_on_column("output_discount"))
profile = why.log(df, schema=schema)
with WhyLabsTransactionWriter() as writer:
status, id = writer.write(profile)
print(f"{status} {id}")