docs/optimization-plan.md
Issue: #1793 - High CPU and Memory Usage on System-Constrained Environments
Date: February 3, 2026
Root Cause Analysis: Completed
Proposed Solution: Combined optimization approach across multiple components
Investigation into issue #1793 revealed that the original worker pool proposal addressed the symptoms but not the root causes. The actual sources of resource exhaustion are:
This document outlines a phased approach to reduce memory usage by 40-60% and CPU usage by 30-50%.
AllResources Map (core/cautils/datastructures.go:53)
ResourcesResult Map (core/cautils/datastructures.go:54)
Temporary Data Structures
getKubernetesObjectsOPA Module Compilation (core/pkg/opaprocessor/processorhandler.go:324-330)
6-Level Nested Loops (core/pkg/opaprocessor/processorhandlerutils.go:136-167)
O(n) Slice Operations
slices.Contains() for deduplication in image scanningRelatedResourcesIDs slice growth with O(n) membership checksThe team is already aware of this issue, with internal documentation acknowledging the problem:
// isLargeCluster returns true if the cluster size is larger than the largeClusterSize
// This code is a workaround for large clusters. The final solution will be to scan resources individually
// Source: core/pkg/opaprocessor/processorhandlerutils.go:279
Objective: Eliminate redundant rule compilations
Files Modified:
core/pkg/opaprocessor/processorhandler.gocore/pkg/opaprocessor/processorhandler_test.goChanges:
type OPAProcessor struct {
// existing fields...
compiledModules map[string]*ast.Compiler
compiledMu sync.RWMutex
}
func (opap *OPAProcessor) getCompiledRule(ctx context.Context, rule reporthandling.Rule, modules map[string]string) (*ast.Compiler, error) {
// Check cache with read lock
cacheKey := rule.Name + "|" + rule.Rule
opap.compiledMu.RLock()
if compiled, ok := opap.compiledModules[cacheKey]; ok {
opap.compiledMu.RUnlock()
return compiled, nil
}
opap.compiledMu.RUnlock()
// Compile new module with write lock
opap.compiledMu.Lock()
defer opap.compiledMu.Unlock()
// Double-check pattern (cache might have been filled)
if compiled, ok := opap.compiledModules[cacheKey]; ok {
return compiled, nil
}
compiled, err := ast.CompileModulesWithOpt(modules, ast.CompileOpts{
EnablePrintStatements: opap.printEnabled,
ParserOptions: ast.ParserOptions{RegoVersion: ast.RegoV0},
})
if err != nil {
return nil, fmt.Errorf("failed to compile rule '%s': %w", rule.Name, err)
}
opap.compiledModules[cacheKey] = compiled
return compiled, nil
}
Integration Point: Replace direct compilation call in runRegoOnK8s(:338 with cached retrieval
Testing:
Expected Savings: 30-40% CPU reduction
Risk: Low - caching is a well-known pattern, minimal behavior change
Dependencies: None
Objective: Reduce memory allocations and fragmentation
Files Modified:
core/cautils/datastructures.gocore/cautils/datastructures_test.gocore/pkg/resourcehandler/handlerpullresources.gocore/pkg/resourcehandler/k8sresources.goChanges:
func NewOPASessionObj(ctx context.Context, frameworks []reporthandling.Framework, k8sResources K8SResources, scanInfo *ScanInfo) *OPASessionObj {
clusterSize := estimateClusterSize(k8sResources)
if clusterSize < 100 {
clusterSize = 100
}
return &OPASessionObj{
AllResources: make(map[string]workloadinterface.IMetadata, clusterSize),
ResourcesResult: make(map[string]resourcesresults.Result, clusterSize),
// ... other pre-sized collections
}
}
func (k8sHandler *K8sResourceHandler) pullResources(queryableResources QueryableResources, ...) (K8SResources, map[string]workloadinterface.IMetadata, map[string]workloadinterface.IMetadata, map[string]map[string]bool, int, error) {
// ... existing code ...
return k8sResources, allResources, externalResources, excludedRulesMap, estimatedCount, nil
}
func CollectResources(ctx context.Context, rsrcHandler IResourceHandler, opaSessionObj *cautils.OPASessionObj, ...) error {
resourcesMap, allResources, externalResources, excludedRulesMap, estimatedCount, err := rsrcHandler.GetResources(ctx, opaSessionObj, scanInfo)
// Re-initialize with proper size
if opaSessionObj.AllResources == nil {
opaSessionObj = cautils.NewOPASessionObj(estimatedCount)
}
opaSessionObj.K8SResources = resourcesMap
opaSessionObj.AllResources = allResources
// ...
}
Testing:
Expected Savings: 10-20% memory reduction, reduced GC pressure
Risk: Low - Go's make() with capacity hint is well-tested
Dependencies: None
Objective: Replace O(n) slice operations with O(1) set operations
Files Modified:
core/pkg/utils/dedup.go (new file)core/core/scan.gocore/pkg/opaprocessor/processorhandler.goChanges:
// core/pkg/utils/dedup.go
package utils
import "sync"
type StringSet struct {
items map[string]struct{}
mu sync.RWMutex
}
func NewStringSet() *StringSet {
return &StringSet{
items: make(map[string]struct{}),
}
}
func (s *StringSet) Add(item string) {
s.mu.Lock()
defer s.mu.Unlock()
s.items[item] = struct{}{}
}
func (s *StringSet) AddAll(items []string) {
s.mu.Lock()
defer s.mu.Unlock()
for _, item := range items {
s.items[item] = struct{}{}
}
}
func (s *StringSet) Contains(item string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.items[item]
return ok
}
func (s *StringSet) ToSlice() []string {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]string, 0, len(s.items))
for item := range s.items {
result = append(result, item)
}
return result
}
core/core/scan.go:249):func scanImages(scanType cautils.ScanTypes, scanData *cautils.OPASessionObj, ...) {
var imagesToScan *utils.StringSet
imagesToScan = utils.NewStringSet()
for _, workload := range scanData.AllResources {
containers, err := workloadinterface.NewWorkloadObj(workload.GetObject()).GetContainers()
if err != nil {
logger.L().Error(...)
continue
}
for _, container := range containers {
if !imagesToScan.Contains(container.Image) {
imagesToScan.Add(container.Image)
}
}
}
// Use imagesToScan.ToSlice() for iteration
}
core/pkg/opaprocessor/processorhandler.go:261):var relatedResourcesIDs *utils.StringSet
relatedResourcesIDs = utils.NewStringSet()
// Inside loop
if !relatedResourcesIDs.Contains(wl.GetID()) {
relatedResourcesIDs.Add(wl.GetID())
// ... process related resource
}
Testing:
Expected Savings: 5-10% CPU reduction for large clusters
Risk: Low - thread-safe set implementation, minimal behavior change
Dependencies: None
Objective: Eliminate repeated computation of resource groupings
Files Modified:
core/pkg/opaprocessor/processorhandler.gocore/pkg/opaprocessor/processorhandlerutils.gocore/pkg/opaprocessor/processorhandler_test.goChanges:
type OPAProcessor struct {
// existing fields...
k8sObjectsCache map[string]map[string][]workloadinterface.IMetadata
k8sObjectsMu sync.RWMutex
}
func (opap *OPAProcessor) getCacheKey(match []reporthandling.RuleMatchObjects) string {
var strings []string
for _, m := range match {
for _, group := range m.APIGroups {
for _, version := range m.APIVersions {
for _, resource := range m.Resources {
strings = append(strings, fmt.Sprintf("%s/%s/%s", group, version, resource))
}
}
}
}
sort.Strings(strings)
return strings.Join(strings, "|")
}
func (opap *OPAProcessor) getKubernetesObjectsCached(k8sResources cautils.K8SResources, match []reporthandling.RuleMatchObjects) map[string][]workloadinterface.IMetadata {
cacheKey := opap.getCacheKey(match)
// Try cache
opap.k8sObjectsMu.RLock()
if cached, ok := opap.k8sObjectsCache[cacheKey]; ok {
opap.k8sObjectsMu.RUnlock()
return cached
}
opap.k8sObjectsMu.RUnlock()
// Compute new value
result := getKubernetesObjects(k8sResources, opap.AllResources, match)
// Store in cache
opap.k8sObjectsMu.Lock()
opap.k8sObjectsCache[cacheKey] = result
opap.k8sObjectsMu.Unlock()
return result
}
Testing:
Expected Savings: 10-15% CPU reduction
Risk: Low-Medium - needs proper cache invalidation logic (not needed as resources are static during scan)
Dependencies: None
Objective: Process resources in batches instead of loading all at once
Files Modified:
core/pkg/resourcehandler/k8sresources.gocore/pkg/resourcehandler/interface.gocore/pkg/resourcehandler/filesloader.gocore/pkg/opaprocessor/processorhandler.gocmd/scan/scan.goChanges:
// core/pkg/resourcehandler/interface.go
type IResourceHandler interface {
GetResources(...) (...)
StreamResources(ctx context.Context, batchSize int) (<-chan workloadinterface.IMetadata, error)
}
func (k8sHandler *K8sResourceHandler) StreamResources(ctx context.Context, batchSize int) (<-chan workloadinterface.IMetadata, error) {
ch := make(chan workloadinterface.IMetadata, batchSize)
go func() {
defer close(ch)
queryableResources := k8sHandler.getQueryableResources()
for i := range queryableResources {
select {
case <-ctx.Done():
return
default:
apiGroup, apiVersion, resource := k8sinterface.StringToResourceGroup(queryableResources[i].GroupVersionResourceTriplet)
gvr := schema.GroupVersionResource{Group: apiGroup, Version: apiVersion, Resource: resource}
result, err := k8sHandler.pullSingleResource(&gvr, nil, queryableResources[i].FieldSelectors, nil)
if err != nil {
continue
}
metaObjs := ConvertMapListToMeta(k8sinterface.ConvertUnstructuredSliceToMap(result))
for _, metaObj := range metaObjs {
select {
case ch <- metaObj:
case <-ctx.Done():
return
}
}
}
}
}()
return ch, nil
}
func (opap *OPAProcessor) ProcessWithStreaming(ctx context.Context, policies *cautils.Policies, resourceStream <-chan workloadinterface.IMetadata, batchSize int) error {
batch := make([]workloadinterface.IMetadata, 0, batchSize)
opaSessionObj := cautils.NewOPASessionObj(batchSize)
// Collect batch
done := false
for !done {
select {
case resource, ok := <-resourceStream:
if !ok {
done = true
break
}
batch = append(batch, resource)
if len(batch) >= batchSize {
opaSessionObj.AllResources = batchToMap(batch)
if err := opap.ProcessBatch(ctx, policies); err != nil {
return err
}
batch = batch[:0] // Clear batch
}
case <-ctx.Done():
return ctx.Err()
}
}
// Process remaining batch
if len(batch) > 0 {
opaSessionObj.AllResources = batchToMap(batch)
if err := opap.ProcessBatch(ctx, policies); err != nil {
return err
}
}
return nil
}
// cmd/scan/scan.go
scanCmd.PersistentFlags().BoolVar(&scanInfo.StreamMode, "stream-resources", false, "Process resources in batches (lower memory, slightly slower)")
scanCmd.PersistentFlags().IntVar(&scanInfo.StreamBatchSize, "stream-batch-size", 100, "Batch size for resource streaming (lower = less memory)")
func shouldEnableStreaming(scanInfo *cautils.ScanInfo, estimatedClusterSize int) bool {
if scanInfo.StreamMode {
return true
}
largeClusterSize, _ := cautils.ParseIntEnvVar("LARGE_CLUSTER_SIZE", 2500)
if estimatedClusterSize > largeClusterSize {
logger.L().Info("Large cluster detected, enabling streaming mode")
return true
}
return false
}
Testing:
Expected Savings: 30-50% memory reduction for large clusters
Risk: Medium - significant behavior change, needs thorough testing
Dependencies: Phase 2 (map pre-sizing)
Objective: Free memory promptly after resources are processed
Files Modified:
core/pkg/opaprocessor/processorhandler.gocore/pkg/opaprocessor/processorhandlerutils.goChanges:
func (opap *OPAProcessor) Process(ctx context.Context, policies *cautils.Policies, progressListener IJobProgressNotificationClient) error {
resourcesRemaining := make(map[string]bool)
for id := range opap.AllResources {
resourcesRemaining[id] = true
}
for _, toPin := range policies.Controls {
control := toPin
resourcesAssociatedControl, err := opap.processControl(ctx, &control)
if err != nil {
logger.L().Ctx(ctx).Warning(err.Error())
}
// Clean up processed resources if not needed for future controls
if len(policies.Controls) > 10 && !isLargeCluster(len(opap.AllResources)) {
for id := range resourcesAssociatedControl {
if resourcesRemaining[id] {
delete(resourcesRemaining, id)
// Remove from AllResources
if resource, ok := opap.AllResources[id]; ok {
removeData(resource)
delete(opap.AllResources, id)
}
}
}
}
}
return nil
}
Testing:
Expected Savings: 10-20% memory reduction, reduced peak memory usage
Risk: Medium - needs careful tracking of which resources are still needed
Dependencies: Phase 5 (resource streaming)
| Phase | Risk Level | Mitigation Strategy |
|---|---|---|
| 1 - OPA Caching | Low | Comprehensive unit tests, fallback to uncached mode |
| 2 - Map Pre-sizing | Low | Backward compatible, capacity hints are safe |
| 3 - Set Dedup | Low | Thread-safe implementation, comprehensive tests |
| 4 - getK8SCache | Low-Medium | Cache key validation, cache invalidation logic |
| 5 - Streaming | Medium | Feature flag (disable by default), extensive integration tests |
| 6 - Early Cleanup | Medium | Track resource dependencies, thorough validation |
# Manual streaming mode
kubescape scan framework all --stream-resources --stream-batch-size 50
# Auto-detection (default)
kubescape scan framework all # Automatically enables streaming for large clusters
# Environment variable
export KUBESCAPE_STREAM_BATCH_SIZE=100
All changes are backward compatible:
github.com/open-policy-agent/opa/ast - OPA compilation (Phase 1)github.com/kubescape/opa-utils - Existing dependencies maintainedNo new external dependencies required.
| Component | File | Line | Notes |
|---|---|---|---|
| AllResources initialization | core/cautils/datastructures.go | 80-81 | Map pre-sizing target |
| OPA compilation | core/pkg/opaprocessor/processorhandler.go | 324-330 | Most CPU-intensive operation |
| getKubernetesObjects | core/pkg/opaprocessor/processorhandlerutils.go | 136-167 | 6-level nested loops |
| Resource collection | core/pkg/resourcehandler/k8sresources.go | 313-355 | Loads all resources |
| Image deduplication | core/core/scan.go | 249 | O(n) slice.Contains |
| Throttle package (unused) | core/pkg/throttle/throttle.go | - | Could be repurposed |
Document Version: 1.0
Prepared by: Code Investigation Team
Review Status: Awaiting stakeholder approval