console/doc/design/20231204_source_progress.md
Today, when a user creates a source in the console, they have no way answer the following questions without writing sql:
A user should be able to see details for a given source, including:
Basically we want to expose the values in mz_source_statistics to users.
Currently, we don't have a way to get the total size of a source in bytes or rows, so we are ignoring that until the data are available in Materialize.
Parker H has already designed a page that will expose this information. The focus of this document is the details of how we get the data on that page.
We will update the existing query to retrieve the source statistics in
conjunction with the status. If the source is running and has no snapshot
committed, we will display a "Snapshotting" StatusPill.
SELECT
s.id,
s.name,
s.type,
s.size,
sc.name AS "schemaName",
d.name AS "databaseName",
status.status,
status.error,
stats.snapshot_committed,
owners."isOwner"
FROM
mz_catalog.mz_sources AS s
JOIN mz_catalog.mz_schemas AS sc ON sc.id = s.schema_id
JOIN mz_catalog.mz_databases AS d ON d.id = sc.database_id
JOIN
(
SELECT
r.id,
(mz_is_superuser() OR has_role(current_user, r.oid, 'USAGE'))
AS "isOwner"
FROM mz_catalog.mz_roles AS r
)
AS owners
ON owners.id = s.owner_id
LEFT JOIN mz_internal.mz_source_statuses AS status ON status.id = s.id
LEFT JOIN mz_internal.mz_source_statistics AS stats ON stats.id = s.id
WHERE s.id LIKE 'u%' AND s.type <> 'progress' AND s.type <> 'subsource'
ORDER BY d.name, sc.name, name;
This will be a new hook that subscribes to the source statistics and bins the
data for display in graphs, very similar to useClusterUtilization. Gus has
committed to enabling 30 day retention on this table, so we will be able to set
an appropriate AS OF to fetch historical data. Because these metrics are per
worker, we have to sum the values across all workers.
SUBSCRIBE (
WITH
subsources AS
(
SELECT od.referenced_object_id AS id, s.id AS "sourceId"
FROM
mz_catalog.mz_sources AS s
JOIN mz_internal.mz_object_dependencies AS od ON object_id = s.id
WHERE s.id = $1 AND s.type <> $2
),
sources AS
(
SELECT s.id, s.id AS "sourceId"
FROM mz_catalog.mz_sources AS s
WHERE s.id = $3
),
combined_sources AS
(
SELECT id, "sourceId" FROM subsources
UNION ALL SELECT id, "sourceId" FROM sources
)
SELECT
s."sourceId" AS id,
sum(messages_received) AS "messagesReceived",
sum(bytes_received) AS "bytesReceived",
sum(updates_staged) AS "updatesStaged",
sum(updates_committed) AS "updatesCommitted",
max(offset_known) AS "offsetKnown",
max(offset_committed) AS "offsetCommitted",
CASE
WHEN bool_or(rehydration_latency IS NULL) THEN NULL
ELSE max(rehydration_latency)
END
AS "rehydrationLatency"
FROM
combined_sources AS s
JOIN mz_internal.mz_source_statistics AS ss ON ss.id = s.id
GROUP BY s."sourceId"
)
WITH (PROGRESS)
AS OF AT LEAST '2024-03-12T22:10:51.432Z'::timestamp
ENVELOPE UPSERT (KEY (id));
The timestamp will be calculated on the client: now - selectedTimePeriod.
The results we get back are cumulative values since the last restart, so we will want to calculate a rate.
function rateFromCumulativeValues(data: CumulativeValues[]): IngestionRate[] {
const rates: IngestionRate[] = [];
for (let i = 1; i < data.length; i++) {
const prev = data[i - 1];
const curr = data[i];
const timeDiff = curr.timestamp.getTime() - prev.timestamp.getTime();
const bytesPerSecond =
(curr.bytesReceived - prev.bytesReceived) / (timeDiff / 1000);
rates.push({
id: curr.id,
timestamp: curr.timestamp,
bytesPerSecond,
});
}
return rates;
}
A new hook to fetch cluster replica metrics. There is an open question around which replica to pick when there are multiple, but this proposal is for showing the metrics from the largest replica. Since the design calls for displaying absolute values, we have to look up the replica capacity, then we can work backwards to figure out the approximate e.g. memory usage based on the total and the percentage.
SELECT
cr.id,
cr.name,
cr.size,
crs.cpu_nano_cores * crs.processes AS "cpuNanoCores",
crs.memory_bytes * crs.processes AS "memoryBytes",
crs.disk_bytes * crs.processes AS "diskBytes",
cru.cpu_percent AS "cpuPercent",
cru.memory_percent AS "memoryPercent",
cru.disk_percent AS "diskPercent"
FROM
mz_catalog.mz_cluster_replicas AS cr
JOIN mz_internal.mz_cluster_replica_sizes AS crs ON crs.size = cr.size
JOIN
(
SELECT
replica_id,
max(cru.cpu_percent) AS cpu_percent,
max(cru.memory_percent) AS memory_percent,
max(cru.disk_percent) AS disk_percent
FROM mz_internal.mz_cluster_replica_utilization AS cru
GROUP BY replica_id
)
AS cru
ON cr.id = cru.replica_id
WHERE cr.cluster_id = $1
ORDER BY cr.id;
A new hook that returns the most recent statistics. We will also get this data
from the subscribe for the graphs, but in the interest of component boundaries,
We will fetch this data in the component that needs it. It will use the same
select as useSourceStatistics, except without subscribe.
A new hook that returns the total size of a source. This data is only updated once an hour, so we should indicate this to the user somehow.
SELECT object_id, sum(size_bytes), max(collection_timestamp)
FROM mz_storage_usage
WHERE object_id = 'u2080'
GROUP BY object_id;
The root route in SourceDetail currently redirects to the errors route, this
will instead render a new SourceOverview component, similar to how the
clusters components are structured. This component will be responsible for
fetching the data using the hooks described above as well as rendering loading
and error states. The exact design of the graph components is TBD, but ideally
we can use a single SourceStatisticsGraph component that accepts a dataKey and
title, as well as the data returned from the useSourceStatistics hook.
An RTL test suite, similar to the ClusterOverview.test.tsx will verify that
given mock data, the components render the appropriate loading, error and
success states. We won't try to validate the graphs beyond verifying that we
rendered something that doesn't appear to be an error state.
I didn't seriously entertain any alternative designs, happy to hear suggestions if I've missed a better approach.
If a source cluster has multiple replicas, which metrics should we display? For now, since sources can only have a single replica, we chose to just use the first replica returned.
We've decided this is out of scope, since there is no safe way to query this data.