internal/proxy/shardclient/README.md
The shardclient package provides client-side connection management and load balancing for communicating with QueryNode shards in the Milvus distributed architecture. It manages QueryNode client connections, caches shard leader information, and implements intelligent request routing strategies.
In Milvus, collections are divided into shards (channels), and each shard has multiple replicas distributed across different QueryNodes for high availability and load balancing. The shardclient package is responsible for:
┌──────────────────────────────────────────────────────────────┐
│ Proxy Layer │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ShardClientMgr │ │
│ │ • Shard leader cache (database → collection → shards) │
│ │ • QueryNode client pool management │
│ │ • Client lifecycle (init, purge, close) │
│ └───────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌───────────────────────▼──────────────────────────────┐ │
│ │ LBPolicy │ │
│ │ • Execute workload on collection/channels │ │
│ │ • Retry logic with replica failover │ │
│ │ • Node selection via balancer │ │
│ └───────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌────────────────┴────────────────┐ │
│ │ │ │
│ ┌──────▼────────┐ ┌─────────▼──────────┐ │
│ │ RoundRobin │ │ LookAsideBalancer │ │
│ │ Balancer │ │ • Cost-based │ │
│ │ │ │ • Health check │ │
│ └───────────────┘ └────────────────────┘ │
│ │ │
│ ┌───────────────────────▼──────────────────────────────┐ │
│ │ shardClient (per QueryNode) │ │
│ │ • Connection pool (configurable size) │ │
│ │ • Round-robin client selection │ │
│ │ • Lazy initialization and expiration │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────┬────────────────────────────────────────┘
│ gRPC
┌───────────────┴───────────────┐
│ │
┌─────▼─────┐ ┌──────▼──────┐
│ QueryNode │ │ QueryNode │
│ (1) │ │ (2) │
└───────────┘ └─────────────┘
The central manager for QueryNode client connections and shard leader information.
File: manager.go
Key Responsibilities:
database → collectionName → channel → []nodeInfo)shardClient instances for each QueryNodeInterface:
type ShardClientMgr interface {
GetShard(ctx context.Context, withCache bool, database, collectionName string,
collectionID int64, channel string) ([]nodeInfo, error)
GetShardLeaderList(ctx context.Context, database, collectionName string,
collectionID int64, withCache bool) ([]string, error)
DeprecateShardCache(database, collectionName string)
InvalidateShardLeaderCache(collections []int64)
GetClient(ctx context.Context, nodeInfo nodeInfo) (types.QueryNodeClient, error)
Start()
Close()
}
Configuration:
purgeInterval: Interval for checking expired clients (default: 600s)expiredDuration: Time after which inactive clients are purged (default: 60min)Manages a connection pool to a single QueryNode.
File: shard_client.go
Features:
ProxyCfg.QueryNodePoolingSize, default: 1)Lifecycle:
getClient() calllastActiveTs on each useExecutes workloads on collections/channels with retry and failover logic.
File: lb_policy.go
Key Methods:
Execute(ctx, CollectionWorkLoad): Execute workload in parallel across all shardsExecuteOneChannel(ctx, CollectionWorkLoad): Execute workload on any single shard (for lightweight operations)ExecuteWithRetry(ctx, ChannelWorkload): Execute on specific channel with retry on different replicasRetry Strategy:
max(retryOnReplica, len(shardLeaders)) timesexcludeNodes set to avoid retrying failed nodesexcludeNodes if all replicas exhaustedWorkload Types:
type ChannelWorkload struct {
Db string
CollectionName string
CollectionID int64
Channel string
Nq int64 // Number of queries
Exec ExecuteFunc // Actual work to execute
}
type ExecuteFunc func(context.Context, UniqueID, types.QueryNodeClient, string) error
Two strategies for selecting QueryNode replicas:
File: roundrobin_balancer.go
Simple round-robin selection across available nodes. No state tracking, minimal overhead.
Use case: Uniform workload distribution when all nodes have similar capacity
File: look_aside_balancer.go
Cost-aware load balancer that considers QueryNode workload and health.
Features:
CostAggregation (response time, service time, total NQ) from QueryNodesscore = executeSpeed + (1 + totalNQ + executingNQ)³ × serviceTime
GetComponentStates RPCConfiguration Parameters:
ProxyCfg.CostMetricsExpireTime: How long to trust cached cost metrics (default: varies)ProxyCfg.CheckWorkloadRequestNum: Check workload every N requests (default: varies)ProxyCfg.WorkloadToleranceFactor: Tolerance for workload difference before preferring lighter nodeProxyCfg.CheckQueryNodeHealthInterval: Interval for health checksProxyCfg.HealthCheckTimeout: Timeout for health check RPCProxyCfg.RetryTimesOnHealthCheck: Failures before marking node unreachableSelection Strategy:
if (requestCount % CheckWorkloadRequestNum == 0) {
// Cost-aware selection
select node with minimum workload score
if (maxScore - minScore) / minScore <= WorkloadToleranceFactor {
fall back to round-robin
}
} else {
// Fast path: round-robin
select next available node
}
Key configuration parameters from paramtable:
| Parameter | Path | Description | Default |
|---|---|---|---|
| QueryNodePoolingSize | ProxyCfg.QueryNodePoolingSize | Size of connection pool per QueryNode | 1 |
| RetryTimesOnReplica | ProxyCfg.RetryTimesOnReplica | Max retry times on replica failures | varies |
| ReplicaSelectionPolicy | ProxyCfg.ReplicaSelectionPolicy | Load balancing policy: round_robin or look_aside | look_aside |
| CostMetricsExpireTime | ProxyCfg.CostMetricsExpireTime | Expiration time for cost metrics cache | varies |
| CheckWorkloadRequestNum | ProxyCfg.CheckWorkloadRequestNum | Frequency of workload-aware selection | varies |
| WorkloadToleranceFactor | ProxyCfg.WorkloadToleranceFactor | Tolerance for workload differences | varies |
| CheckQueryNodeHealthInterval | ProxyCfg.CheckQueryNodeHealthInterval | Health check interval | varies |
| HealthCheckTimeout | ProxyCfg.HealthCheckTimeout | Health check RPC timeout | varies |
import (
"context"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/types"
)
// 1. Create ShardClientMgr with MixCoord client
mgr := shardclient.NewShardClientMgr(mixCoordClient)
mgr.Start() // Start background purge goroutine
defer mgr.Close()
// 2. Create LBPolicy
policy := shardclient.NewLBPolicyImpl(mgr)
policy.Start(ctx) // Start load balancer (health checks, etc.)
defer policy.Close()
// 3. Execute collection workload (e.g., search/query)
workload := shardclient.CollectionWorkLoad{
Db: "default",
CollectionName: "my_collection",
CollectionID: 12345,
Nq: 100, // Number of queries
Exec: func(ctx context.Context, nodeID int64, client types.QueryNodeClient, channel string) error {
// Perform actual work (search, query, etc.)
req := &querypb.SearchRequest
resp, err := client.Search(ctx, req)
return err
},
}
// Execute on all channels in parallel
err := policy.Execute(ctx, workload)
// Or execute on any single channel (for lightweight ops)
err := policy.ExecuteOneChannel(ctx, workload)
The shard leader cache stores the mapping of shards to their leader QueryNodes:
database → collectionName → shardLeaders {
collectionID: int64
shardLeaders: map[channel][]nodeInfo
}
Cache Operations:
ProxyCacheStatsCounter)GetShardLeadersDeprecateShardCache(db, collection): Remove specific collectionInvalidateShardLeaderCache(collectionIDs): Remove collections by ID (called on shard leader changes)RemoveDatabase(db): Remove entire databaseThe ShardClientMgr periodically purges unused clients:
purgeInterval (default: 600s), iterate all cached clientsListShardLocation())lastActiveTs > expiredDuration), close and removeerrClosed: Client is closed (returned when accessing closed shardClient)merr.ErrChannelNotAvailable: No available shard leaders for channelmerr.ErrNodeNotAvailable: Selected node is not availablemerr.ErrCollectionNotLoaded: Collection is not loaded in QueryNodesmerr.ErrServiceUnavailable: All available nodes are unreachableRetry is handled at multiple levels:
LBPolicy level:
Balancer level:
gRPC level:
The package exports several metrics:
ProxyCacheStatsCounter: Shard leader cache hit/miss statistics
nodeID, method (GetShard/GetShardLeaderList), status (hit/miss)ProxyUpdateCacheLatency: Latency of updating shard leader cache
nodeID, methodThe package includes extensive test coverage:
shard_client_test.go: Tests for connection pool managementmanager_test.go: Tests for cache management and client lifecyclelb_policy_test.go: Tests for retry logic and workload executionroundrobin_balancer_test.go: Tests for round-robin selectionlook_aside_balancer_test.go: Tests for cost-aware selection and health checksMock interfaces (via mockery):
mock_shardclient_manager.go: Mock ShardClientMgrmock_lb_policy.go: Mock LBPolicymock_lb_balancer.go: Mock LBBalancerAll components are designed for concurrent access:
shardClientMgrImpl: Uses sync.RWMutex for cache, typeutil.ConcurrentMap for clientsshardClient: Uses sync.RWMutex and atomic operationsLookAsideBalancer: Uses typeutil.ConcurrentMap for all mutable stateRoundRobinBalancer: Uses atomic.Int64 for indexinternal/proxy/): Uses shardclient to route search/query requests to QueryNodesinternal/querycoordv2/): Provides shard leader information via GetShardLeaders RPCinternal/querynodev2/): Receives and processes requests routed by shardclientinternal/registry/): Provides client creation functions for gRPC connectionsPotential areas for enhancement: