content/v2.2/operations/watchoperation.md
A WatchOperation creates [Operations]({{<ref "operation">}}) when watched
Kubernetes resources change. Use WatchOperations for reactive operational
workflows such as backing up databases before deletion, validating
configurations after updates, or triggering alerts when resources fail.
WatchOperations watch specific Kubernetes resources and create new Operations whenever those resources change. The changed resource is automatically injected into the Operation for the function to process.
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: config-validator
spec:
watch:
apiVersion: v1
kind: ConfigMap
matchLabels:
validate: "true"
concurrencyPolicy: Allow
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: validate
functionRef:
name: function-config-validator
input:
apiVersion: fn.crossplane.io/v1beta1
kind: ConfigValidatorInput
rules:
- required: ["database.url", "database.port"]
- format: "email"
field: "notification.email"
- step: notify
functionRef:
name: function-slack-notifier
input:
apiVersion: fn.crossplane.io/v1beta1
kind: SlackNotifierInput
channel: "#alerts"
severity: "warning"
{{<hint "important">}}
WatchOperations are an alpha feature. You must enable Operations by adding
--enable-operations to Crossplane's arguments.
{{</hint>}}
WatchOperations can watch any Kubernetes resource with flexible filtering:
spec:
watch:
apiVersion: apps/v1
kind: Deployment
spec:
watch:
apiVersion: v1
kind: ConfigMap
namespace: production
spec:
watch:
apiVersion: example.org/v1
kind: Database
matchLabels:
backup: "enabled"
environment: "production"
spec:
watch:
apiVersion: v1
kind: Node
matchLabels:
node-role.kubernetes.io/worker: ""
When a WatchOperation creates an Operation, it automatically injects the changed
resource using the special requirement name
ops.crossplane.io/watched-resource. Functions can access this resource without
explicitly requesting it.
For example, when a ConfigMap with label validate: "true" changes, the
WatchOperation creates an Operation like this:
apiVersion: ops.crossplane.io/v1alpha1
kind: Operation
metadata:
name: config-validator-abc123
spec:
mode: Pipeline
pipeline:
- step: validate
functionRef:
name: function-config-validator
requirements:
requiredResources:
- requirementName: ops.crossplane.io/watched-resource
apiVersion: v1
kind: ConfigMap
name: my-config
namespace: default
# ... other pipeline steps from operationTemplate
The watched resource is automatically available to functions in
req.required_resources under the special name
ops.crossplane.io/watched-resource.
WatchOperations support the same concurrency policies as CronOperations:
{{<hint "note">}} The following examples use hypothetical functions for illustration. At launch, only function-python supports operations. {{</hint>}}
Validate ConfigMaps when they change:
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: config-validator
spec:
watch:
apiVersion: v1
kind: ConfigMap
matchLabels:
validate: "true"
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: validate-config
functionRef:
name: function-config-validator
input:
apiVersion: fn.crossplane.io/v1beta1
kind: ConfigValidatorInput
rules:
- required: ["database.host", "database.port"]
- format: "email"
field: "notification.email"
Backup databases before they're deleted:
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: backup-on-deletion
spec:
watch:
apiVersion: rds.aws.m.upbound.io/v1beta1
kind: Instance
# Note: Watching for deletion requires function logic
# to check deletion timestamp
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: create-backup
functionRef:
name: function-rds-backup
input:
apiVersion: fn.crossplane.io/v1beta1
kind: RDSBackupInput
retentionDays: 30
Alert when resources enter a failed state:
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: failure-alerts
spec:
watch:
apiVersion: example.org/v1
kind: App
matchLabels:
alert: "enabled"
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: check-status
functionRef:
name: function-status-checker
input:
apiVersion: fn.crossplane.io/v1beta1
kind: StatusCheckerInput
alertConditions:
- type: "Ready"
status: "False"
- step: send-alert
functionRef:
name: function-alertmanager
input:
apiVersion: fn.crossplane.io/v1beta1
kind: AlertInput
severity: "critical"
Complex resource watching with multiple conditions:
# Watch Deployments in specific namespaces with multiple label conditions
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: multi-condition-watcher
spec:
watch:
apiVersion: apps/v1
kind: Deployment
namespace: production # Only production namespace
matchLabels:
app.kubernetes.io/managed-by: "crossplane"
environment: "prod"
backup-required: "true"
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: backup-deployment
functionRef:
name: function-deployment-backup
# Watch custom resources across all namespaces
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: database-lifecycle-manager
spec:
watch:
apiVersion: database.example.io/v1
kind: PostgreSQLInstance
# No namespace specified = watch all namespaces
matchLabels:
lifecycle-management: "enabled"
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: lifecycle-check
functionRef:
name: function-database-lifecycle
input:
apiVersion: fn.crossplane.io/v1beta1
kind: DatabaseLifecycleInput
checkDeletionTimestamp: true
autoBackup: true
WatchOperations can watch one resource type and dynamically fetch related resources. Here's a WatchOperation that watches Ingresses and manages certificates:
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: ingress-certificate-manager
spec:
watch:
apiVersion: networking.k8s.io/v1
kind: Ingress
matchLabels:
auto-cert: "enabled"
operationTemplate:
spec:
mode: Pipeline
pipeline:
- step: manage-certificates
functionRef:
name: function-cert-manager
input:
apiVersion: fn.crossplane.io/v1beta1
kind: CertManagerInput
issuer: "letsencrypt-prod"
renewBefore: "720h" # 30 days
The function examines the watched Ingress and dynamically requests related resources:
from crossplane.function import request, response
def operate(req, rsp):
# Access the watched Ingress resource
ingress = request.get_required_resource(req, "ops.crossplane.io/watched-resource")
if not ingress:
response.fatal(rsp, "No watched resource found")
return
# Extract the service name from the Ingress backend
rules = ingress.get("spec", {}).get("rules", [])
if not rules:
response.fatal(rsp, "Could not extract service name from ingress")
return
backend = rules[0].get("http", {}).get("paths", [{}])[0].get("backend", {})
service_name = backend.get("service", {}).get("name")
if not service_name:
response.fatal(rsp, "Could not extract service name from ingress")
return
ingress_namespace = ingress.get("metadata", {}).get("namespace", "default")
# CRITICAL: Always request the same resources to ensure requirement
# stabilization. Crossplane calls the function repeatedly until
# requirements don't change.
response.require_resources(
rsp,
name="related-service",
api_version="v1",
kind="Service",
match_name=service_name,
namespace=ingress_namespace
)
# Check if the service is available and process accordingly
service = request.get_required_resource(req, "related-service")
if service:
# Success: Both resources available
response.set_output(rsp, {
"status": "success",
"message": "Certificate management completed",
"ingress_host": ingress.get("spec", {}).get("rules", [{}])[0].get("host"),
"service_name": service.get("metadata", {}).get("name")
})
return
# Waiting: Service not available yet
response.set_output(rsp, {
"status": "waiting",
"message": f"Waiting for service '{service_name}' to be available"
})
{{<hint "important">}}
Critical resource stabilization pattern: functions must return the same
requirements in each iteration to signal completion. The function in the
preceding example always calls response.require_resources() regardless of
whether the service exists. This ensures Crossplane knows when to stop calling
the function.
Common mistake: only requesting resources when missing breaks the stabilization contract and causes timeout errors. {{</hint>}}
This pattern allows functions to:
response.require_resources()WatchOperations provide status information about watching:
status:
conditions:
- type: Synced
status: "True"
reason: ReconcileSuccess
- type: Watching
status: "True"
reason: WatchActive
watchingResources: 12
runningOperationRefs:
- name: config-validator-anjda
- name: config-validator-f0d92
Key status fields:
True when the WatchOperation is actively watching resources, False when paused or failedwatchingResources: Number of resources under watchrunningOperationRefs: Running Operations created by this WatchOperationWatchOperations emit events for important activities:
EstablishWatched (Warning) - Watch establishment failuresTerminateWatched (Warning) - Watch termination failuresGarbageCollectOperations (Warning) - Operation cleanup failuresCreateOperation (Warning) - Operation creation failuresReplaceRunningOperation (Warning) - Operation replacement failuresMonitor WatchOperations using:
<!-- vale write-good.TooWordy = YES --># Check WatchOperation status
kubectl get watchoperation my-watchop
# View recent Operations created by the WatchOperation
kubectl get operations -l crossplane.io/watchoperation=my-watchop
# Check watched resource count
kubectl describe watchoperation my-watchop
# Check events
kubectl get events --field-selector involvedObject.name=my-watchop
Like CronOperations, WatchOperations automatically clean up completed Operations:
apiVersion: ops.crossplane.io/v1alpha1
kind: WatchOperation
metadata:
name: config-validator
spec:
watch:
apiVersion: v1
kind: ConfigMap
successfulHistoryLimit: 10 # Keep 10 successful Operations (default: 3)
failedHistoryLimit: 5 # Keep 5 failed Operations (default: 1)
operationTemplate:
# Operation template here
WatchOperations automatically inject the changed resource into the created
Operation using a special requirement name
ops.crossplane.io/watched-resource:
from crossplane.function import request, response
def operate(req, rsp):
# Access the resource that triggered this Operation
watched_resource = request.get_required_resource(req, "ops.crossplane.io/watched-resource")
if not watched_resource:
response.set_output(rsp, {"error": "No watched resource found"})
return
# Process based on the watched resource
if watched_resource["kind"] == "ConfigMap":
config_data = watched_resource["data"]
# Validate configuration...
The watched resource is available in the function's required_resources map
without needing to declare it in the Operation template.
For general Operations best practices including function development and operational considerations, see [Operation best practices]({{<ref "operation#best-practices">}}).
Watching=True conditionForbid or Replace concurrency policyoperationops.crossplane.io/watched-resource