docs/published/handbook/engineering/clickhouse/replication.md
This document provides information on:
A great guide on setting up replicated tables on a pre-existing cluster can be found in ClickHouse documentation.
Some important highlights are:
ON CLUSTER)clickhouse-keeperAlways use unique ZooKeeper paths for table definitions as re-use can and will lead to data loss. This applies even if the previous table has been dropped.
</blockquote>Sharding helps scale a dataset by having each node only store part of the data.
To decide whether to shard a table, consider how it's queried and what data it stores:
Sharding also requires care given in the schema - queries touching data should ideally only need to load data from a given shard.
When creating a replicated table, configuring whether a table is sharded or not is done via varying the parameters to a ReplicatedMergeTree engine:
ReplicatedMergeTree('/zk/some/path/{shard}/tablename', '{replica}')ReplicatedMergeTree('/zk/some/path/tablename', '{replica}-{shard}')Note that resharding large tables is currently a relatively painful and bespoke operation - be careful choosing a good sharding key.
When doing larger cluster operations, it's often important to keep an eye on replication. The system.replication_queue and system.replicated_fetches tables can provide at-a-glance overview of what the system is doing.
Distributed table engineDistributed table engine tables
are used to query and write to sharded tables. Note that Distributed engine tables do not store any data on their own
but rather always fan out to ReplicatedMergeTree tables on the cluster.
Distributed tables workWhen INSERTing data against Distributed tables, ClickHouse decides which shard each row belongs to and forwards data to relevant shard(s) based on the sharding_key.
Note that if your underlying table has columns that ClickHouse populates (e.g. ALIAS, MATERIALIZED), it's often necessary to set up two Distributed tables:
Distributed tables workWhen querying Distributed table, you can send the query to any node in the ClickHouse cluster. That node becomes the coordinator, which:
Given local execution is faster than reading data over the network, ClickHouse will usually perform one of the queries locally instead of sending it to another replica of its shard.
Depending on the query, sub-queries executed on other shards might either return already aggregated data or stream entire datasets across the network. Being aware of which is done is crucial for performance.
Consider the following tables:
CREATE TABLE sharded_sensor_values ON CLUSTER 'my_cluster' (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sharded_sensor_values', '{replica}')
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192
CREATE TABLE distributed_sensor_values ON CLUSTER 'my_cluster' (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = Distributed('my_cluster', 'default', 'sharded_sensor_values', intHash64(site_id))
Writes and queries should be made against table distributed_sensor_values in this schema. It then distributes the data according to site_id.
INSERT INTO distributed_sensor_values
SELECT *
FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)
LIMIT 100000000
Consider this simple aggregation query executed against clickhouse01:
SELECT hostName(), sum(metric_value) FROM distributed_sensor_values GROUP BY hostName()
-- Results:
-- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
-- ┃ hostname() ┃ sum(metric_value) ┃
-- ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
-- │ clickhouse01 │ -9035836479117 │
-- ├──────────────┼───────────────────┤
-- │ clickhouse03 │ 10003905228283 │
-- └──────────────┴───────────────────┘
hostName is a clickhouse helper function which
returns the hostname query is executed on.
In this case clickhouse01 was the coordinator node. It:
clickhouse03 on other shard to execute. The query was SELECT hostname(), sum(`metric_value`) FROM `default`.`sharded_sensor_values` GROUP BY hostname()In this case, minimal network traffic happened since the results of a query could be combined independently.
<details><summary>Click to see full `EXPLAIN` plan</summary>Expression ((Projection + Before ORDER BY))
Header: hostname() String
sum(metric_value) Int64
MergingAggregated
Header: hostname() String
sum(metric_value) Int64
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: hostname() String
sum(metric_value) AggregateFunction(sum, Int32)
Union
Header: hostname() String
sum(metric_value) AggregateFunction(sum, Int32)
Aggregating
Header: hostname() String
sum(metric_value) AggregateFunction(sum, Int32)
Expression (Before GROUP BY)
Header: metric_value Int32
hostname() String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: metric_value Int32
ReadFromMergeTree
Header: metric_value Int32
Indexes:
PrimaryKey
Condition: true
Parts: 6/6
Granules: 5723/5723
ReadFromRemote (Read from remote replica)
Header: hostname() String
sum(metric_value) AggregateFunction(sum, Int32)
Consider this query:
SELECT
site_id,
uniq(event)
FROM distributed_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id
ORDER BY uniq(event) DESC
LIMIT 20
In this case, the query sent to other shards cannot do all the work on its own. Instead, the query being sent to the other shard would look something like the following:
SELECT
site_id,
uniqState(event)
FROM sharded_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id
In EXPLAIN output, this would be expressed as:
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
In this case coordinator needs to receive a lot of data from the other shards to calculate the correct results:
This query is expensive in terms of the amount of data that needs to be transferred over the network.
One thing that makes this query more efficient is uniqState, which is a aggregate function combinator. It's useful since rather needing to send over all the events, the coordinator can send back an optimized bitmap-like structure that the coordinator can combine with its own results.
Expression (Projection)
Header: site_id UInt32
uniq(event) UInt64
Limit (preliminary LIMIT (without OFFSET))
Header: site_id UInt32
uniq(event) UInt64
Sorting (Sorting for ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Expression (Before ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
MergingAggregated
Header: site_id UInt32
uniq(event) UInt64
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Union
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Aggregating
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Expression (Before GROUP BY)
Header: site_id UInt32
event String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
event String
ReadFromMergeTree
Header: site_id UInt32
event String
Indexes:
PrimaryKey
Keys:
toStartOfDay(timestamp)
Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))
Parts: 6/6
Granules: 1628/5723
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
This query can be made faster by setting the
distributed_group_by_no_merge
setting, like so:
SELECT
site_id,
uniq(event)
FROM distributed_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id
ORDER BY uniq(event) DESC
SETTINGS distributed_group_by_no_merge=1
LIMIT 20
After this, the coordinator knows to trust that the data is sharded according to site_id and it can send the same query down to other shards.
In EXPLAIN, this is represented by the ReadFromRemote being done later in the cycle and now reading UInt64 instead of AggregateFunction(uniq, String):
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) UInt64
Takeaway: Proper data layout and usage of query settings can improve queries significantly by doing less work over the network.
<details><summary>Click to see full `EXPLAIN` plan</summary>Header: site_id UInt32
uniq(event) UInt64
Union
Header: site_id UInt32
uniq(event) UInt64
Expression (Projection)
Header: site_id UInt32
uniq(event) UInt64
Limit (preliminary LIMIT (without OFFSET))
Header: site_id UInt32
uniq(event) UInt64
Sorting (Sorting for ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Expression (Before ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Aggregating
Header: site_id UInt32
uniq(event) UInt64
Expression (Before GROUP BY)
Header: site_id UInt32
event String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
event String
ReadFromMergeTree
Header: site_id UInt32
event String
Indexes:
PrimaryKey
Keys:
toStartOfDay(timestamp)
Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))
Parts: 6/6
Granules: 1628/5723
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) UInt64
Some noteworthy query settings which affect the behavior of distributed queries are:
Many of these unlock potential optimizations by streaming less data over the network, but require data to be sharded correctly to work.
It's sometimes useful to query data from across the cluster without setting up Distributed tables, for example to query system tables on all nodes or shards.
This can be done as such:
SELECT hostName(), shardNum(), *
FROM clusterAllReplicas('my_cluster', 'system', 'metrics')
More documentation on this can be found at:
Next in the ClickHouse manual: Data ingestion