Back to Source Monitor

Job & Pipeline Reliability Audit

.vbw-planning/milestones/07-rails-audit-and-refactoring/04-job-pipeline-reliability/04-01-RESEARCH.md

0.13.021.4 KB
Original Source

Job & Pipeline Reliability Audit (Phase 04)

Current State

Job Inventory (11 jobs, 3 queues)

JobQueueLinesPurposeError Handling
FetchFeedJobfetch147Fetch single source's feedRobust retry/circuit (S2)
ScheduleFetchesJobfetch32Enqueue due fetchesBasic logging only
ScrapeItemJobscrape67Scrape single itemBasic try/catch + raise
SourceHealthCheckJobmaintenance85Health probe on sourceLogs errors, records failures
ImportSessionHealthCheckJobmaintenance100Health-check import candidatesMISSING deadlock rescue (S1)
ItemCleanupJobmaintenance48Prune items by age/countNo error handling
LogCleanupJobfetch18Remove old logsNo error handling (assume none in file)
ImportOpmlJobmaintenance160OPML importTry/catch but accumulates errors in arrays
FaviconFetchJobmaintenance73Fetch faviconDeadlock rescue + broad StandardError
DownloadContentImagesJobmaintenance75Download item imagesDeadlock rescue in loop, broad catch
ApplicationJobN/A18Base classNone

Total job code: ~824 lines

Pipeline Component Line Counts

ComponentLinesError Handling
FetchRunner157Advisory lock + context manager (try/catch/ensure)
FeedFetcher408Typed errors + Result pattern + retry decision
RetryPolicy90Decision struct, classifies error types
AdaptiveInterval141(interval calculation, no errors)
SourceUpdater200(state updates, no error handling)
EntryProcessor89(iteration, delegates to ItemCreator)
ItemCreator174Result pattern, save! on updates
RetentionHandler30Catches StandardError, logs, returns nil
FollowUpHandler43Per-item begin/rescue, logs, continues
EventPublisher20No error handling (dispatcher handles it)
Scheduler (fetch)86No error handling (instrumentation only)
Scheduler (scrape)41No error handling
Enqueuer (scrape)163Result pattern with statuses, no exceptions
Retention Pruner147Result pattern, no error handling (batches via in_batches)

Total pipeline code: ~1,786 lines


Audit Findings Detail

S1: ImportSessionHealthCheckJob Lacks Deadlock Rescue

Location: app/jobs/source_monitor/import_session_health_check_job.rb:4-100

Status:ALREADY FIXED (line 13-16)

The job DOES include rescue_from ActiveRecord::Deadlocked with a jitter-backoff retry:

ruby
rescue_from ActiveRecord::Deadlocked do |error|
  Rails.logger&.warn("[SourceMonitor::ImportSessionHealthCheckJob] Deadlock: #{error.message}")
  retry_job(wait: 2.seconds + rand(3).seconds)
end

However, there is a POTENTIAL BUG at lines 58-59: the rescue block re-raises ActiveRecord::Deadlocked AFTER being caught by the rescue_from handler. This causes a double-handling:

ruby
rescue ActiveRecord::Deadlocked
  raise  # Line 59: Will be caught again by rescue_from? No - rescue_from catches during initial raise
end

Actually, this is correct: rescue_from intercepts during the initial raise, so lines 58-59 are unreachable code (dead). The real rescue_from at line 13 will handle it.

Recommendation: Remove the unreachable rescue at lines 58-59 to clarify intent.


S2: FetchFeedJob Has Complex Retry Logic That Should Be a Service

Location: app/jobs/source_monitor/fetch_feed_job.rb:4-146

Current State: The job contains 147 lines, with ~60 lines of custom retry/circuit-breaker logic:

  • handle_concurrency_error (lines 35-46) — concurrency backoff + max attempts
  • handle_transient_error (lines 69-86) — delegates to RetryPolicy
  • enqueue_retry! (lines 89-105) — enqueues delayed retry + updates source state
  • open_circuit! (lines 107-119) — opens circuit breaker + updates state
  • reset_retry_state! (lines 121-130) — resets retry counter

Issue: This logic mixes job concerns (deserialization + delegation) with domain logic (retry orchestration, circuit-breaker state management).

Current Pattern: RetryPolicy (85 lines) makes decisions; FetchFeedJob executes them. Retry execution touches Source model 3+ times:

ruby
source.with_lock do
  source.reload
  source.update!(fetch_retry_attempt:, next_fetch_at:, ...)
end

Recommendation: Extract to FetchRetryOrchestrator service (or similar) that:

  1. Takes a decision from RetryPolicy
  2. Updates source state
  3. Enqueues the retry job
  4. Returns success/failure

This centralizes retry orchestration outside the job, making it reusable and testable.


S3: Some Jobs Contain Too Much Business Logic

Pattern Violations Found:

a) ImportOpmlJob (160 lines) — Too Much Domain Logic

  • Lines 50-85: process_entry contains business logic:
    • Duplicate detection (lines 60-69)
    • Source creation + validation (line 71)
    • Error accumulation (lines 78-84)
    • Favicon enqueueing (line 75)
  • Should delegate to a single ImportSessionProcessor service that returns a result

b) ImportSessionHealthCheckJob (100 lines) — Complex State Updates

  • Lines 28-53: Locks session, mutates parsed_sources array, recalculates selected_source_ids
  • Lines 51-57: Broadcasts partial updates
  • Should delegate to an ImportSessionUpdater service

c) ScrapeItemJob (67 lines) — Scraping Orchestration + Rate Limiting

  • Lines 42-52: Rate limiting logic (time-based + per-source quota)
  • Lines 21-27: Deferral logic with retry scheduling
  • Should delegate to Scraping::RateLimiter (or similar)

Current Pattern: All three jobs do perform shallow delegation to services (ItemScraper, HealthCheckBroadcaster, etc.), but they coordinate too many concerns. Job should be:

ruby
def perform(id)
  record = Model.find_by(id:)
  return unless record

  Service.call(record)  # ← Only this
end

Instead, they have state mutation, error handling, and re-enqueueing inside the job.


S4: EventPublisher, RetentionHandler, FollowUpHandler, Scheduler Lack Result Pattern

Current State:

ComponentHas Result?Pattern
EventPublisherNoDirect dispatch, no return value (20 lines)
RetentionHandlerNoCatches StandardError, logs, returns nil (30 lines)
FollowUpHandlerNoPer-item rescue, logs errors, continues (43 lines)
Scheduler (fetch)NoNo error handling, returns count (86 lines)
Scheduler (scrape)NoNo error handling, returns count (41 lines)
ItemCreatorYesResult struct with status (:created/:updated/:unchanged)
RetentionPrunerYesResult struct with counts (removed_by_age, removed_by_limit, removed_total)
Scraping::EnqueuerYesResult struct with status + message
FeedFetcherYesResult struct with status + error + retry_decision
ItemScraperYesResult struct with status + item + log + error

Issue: Completion handlers (RetentionHandler, FollowUpHandler, EventPublisher) and schedulers have no way to signal:

  • Whether they succeeded or failed
  • What errors occurred (only logged)
  • Whether they should trigger alerting/retry

Example Gap — RetentionHandler:

ruby
def call(source:, result:)
  pruner.call(...)
rescue StandardError => error
  Rails.logger.error("[SourceMonitor] Retention pruning failed...")
  nil  # ← No signal to caller that this failed
end

If retention fails, FetchRunner has no way to know, and the source continues as if pruning succeeded.


S5: Pipeline Error Paths Have Inconsistent Logging

Current Logging Patterns:

ComponentLog LevelFormatConsistency
FetchFeedJoberror[SourceMonitor::FetchFeedJob] message✓ Consistent
ImportSessionHealthCheckJoberror[SourceMonitor::ImportSessionHealthCheckJob] message✓ Consistent
ScrapeItemJobinfo[SourceMonitor::ManualScrape] payload.to_json✗ Wrong level (should be error for failures)
SourceHealthCheckJoberror[SourceMonitor::SourceHealthCheckJob] message✓ Consistent
RetentionHandlererror[SourceMonitor] Retention pruning failed...✗ Missing job/handler name
FollowUpHandlererror[SourceMonitor] FollowUpHandler: failed...✗ Inconsistent with other handlers
FeedFetcherinstrumentationStruct fields + events✓ Structured
RetryPolicynoneNo logging✗ Silent failures possible

Issues:

  1. ScrapeItemJob logs at info level even for errors (should be warn or error)
  2. RetentionHandler doesn't identify itself in the log prefix
  3. FollowUpHandler manually constructs the prefix instead of using a consistent format
  4. Schedulers (fetch, scrape) don't log at all — only instrument via notifications
  5. Retry decisions (RetryPolicy) are silent — hard to debug why a retry happened or didn't

S6: FaviconFetchJob and DownloadContentImagesJob Don't Classify Transient vs Fatal Errors

Location: app/jobs/source_monitor/favicon_fetch_job.rb:26-31 and app/jobs/source_monitor/download_content_images_job.rb:63-68

Current Patterns:

FaviconFetchJob (lines 26-31):

ruby
rescue ActiveRecord::Deadlocked
  raise  # Let job framework retry
rescue StandardError => error
  record_failed_attempt(source)
  log_error(source, error)
end

Issue: All non-deadlock StandardErrors are treated the same:

  • SocketTimeout (transient, should retry) → marks as failed + logs
  • InvalidImage (fatal, won't retry) → marks as failed + logs
  • S3WriteFailed (transient) → marks as failed + logs

No distinction between errors that should retry vs. those that won't.

DownloadContentImagesJob (lines 50-69):

ruby
image_urls.each do |image_url|
  result = SourceMonitor::Images::Downloader.new(...).call
  next unless result
  # ... attach blob ...
rescue ActiveRecord::Deadlocked
  raise
rescue StandardError
  # Individual image failure should not block others
  next
end

Issue: The inner rescue catches ALL errors (download, attachment, blob creation) and silently continues. Can't distinguish:

  • Individual image download failure (acceptable, skip image) ✓
  • Blob creation failure (concerning, should retry entire job) ✗
  • Attachment failure (concerning, should retry entire job) ✗

Existing Patterns

Error Classification (FeedFetcher/RetryPolicy)

Typed Error Hierarchy:

FetchError (base)
  ├── TimeoutError (transient → retry 2x, 2min wait, 1hr circuit)
  ├── ConnectionError (transient → retry 3x, 5min wait, 1hr circuit)
  ├── HTTPError (branching)
  │   ├── 429 (transient → retry 2x, 15min wait, 90min circuit)
  │   ├── 5xx (transient → retry 2x, 10min wait, 90min circuit)
  │   └── 4xx (fatal → retry 1x, 45min wait, 2hr circuit)
  ├── ParsingError (fatal → retry 1x, 30min wait, 2hr circuit)
  ├── BlockedError (fatal → retry 1x, 1hr wait, 4hr circuit)
  ├── AuthenticationError (fatal → retry 1x, 1hr wait, 4hr circuit)
  └── UnexpectedResponseError (unknown → retry 1x, 30min wait, 2hr circuit)

RetryPolicy Decision Struct (lines 8-15):

ruby
Decision = Struct.new(
  :retry?,               # boolean
  :wait,                 # duration
  :next_attempt,         # counter
  :open_circuit?,        # boolean
  :circuit_until,        # timestamp
  keyword_init: true
)

Usage in FetchFeedJob (lines 72-86):

ruby
decision = SourceMonitor::Fetching::RetryPolicy.new(source:, error:, now: Time.current).decision
return raise error unless decision

if decision.retry?
  enqueue_retry!(source, decision)  # ← Service needed here
elsif decision.open_circuit?
  open_circuit!(source, decision)   # ← Service needed here
else
  reset_retry_state!(source)        # ← Service needed here
  raise error
end

Result Pattern Usage

Applied in 6 components:

  1. FeedFetcher::Result (8 fields: status, feed, response, body, error, item_processing, retry_decision)

    • Comprehensive; includes both success and failure details
    • Used by FetchRunner to decide post-fetch actions
  2. ItemCreator::Result (3 fields: item, status, matched_by)

    • Lightweight; just tracking create vs. update vs. unchanged
    • Used by EntryProcessor for counting
  3. RetentionPruner::Result (3 fields: removed_by_age, removed_by_limit, removed_total)

    • Instrumentation-focused; only counts
    • Does NOT include error information
  4. Scraping::Enqueuer::Result (3 fields + 4 status helper methods: enqueued?, already_enqueued?, deferred?, failure?)

    • Strong pattern; status + message + item + helpers
    • Enables controllers to show user-facing feedback
  5. ItemScraper::Result (5 fields: status, item, log, message, error)

    • Includes error details
    • Better than Enqueuer for error propagation
  6. FeedFetcher::EntryProcessingResult (8 fields: created, updated, unchanged, failed, items, errors, created_items, updated_items)

    • Detailed; tracks both counts and actual items
    • Includes error array for failed entries

Completion Handlers (FetchRunner post-fetch)

Pipeline (FetchRunner#run, lines 53-83):

1. AdvisoryLock.with_lock do
2.   FeedFetcher.call                  # → Result
3.   RetentionHandler.call(result)     # → nil (no signal)
4.   FollowUpHandler.call(result)      # → nil (no signal)
5.   schedule_retry_if_needed(result)  # Uses Result.retry_decision
6.   mark_complete!(result)            # Uses Result.status
7. end
8. EventPublisher.call(result)         # → nil (no signal)

Handler Patterns:

HandlerSuccess PathFailure Path
RetentionHandlerReturns nilLogs error, returns nil
FollowUpHandlerEnqueues jobs for created itemsPer-item rescue, logs, continues
EventPublisherDispatches eventNo error handling (assumed in dispatcher)

Issue: If RetentionHandler or FollowUpHandler fail, FetchRunner doesn't know. Execution continues as if the handler succeeded.


Job Logging Patterns

Consistent Format (when used):

ruby
# Pattern 1: Structured JSON (ScrapeItemJob, Scraping::Enqueuer)
payload = {
  stage: "SourceMonitor::ScrapeItemJob#job:start",
  item_id: item&.id,
  source_id: item&.source_id,
  **extra
}.compact
Rails.logger.info("[SourceMonitor::ManualScrape] #{payload.to_json}")

# Pattern 2: Message string (FetchFeedJob, ImportSessionHealthCheckJob)
message = "[SourceMonitor::FetchFeedJob] Fetch already in progress for source #{@source_id}"
Rails.logger.info(message)

# Pattern 3: Error logging (RetentionHandler, FollowUpHandler)
Rails.logger.error("[SourceMonitor] Retention pruning failed for source #{source.id}: #{error.class} - #{error.message}")

Common Guard (all jobs):

ruby
return unless defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger

Recommendations

Priority 1 — Extract Retry Orchestration Service (S2)

Name: SourceMonitor::Fetching::RetryOrchestrator (or RetryDecisionExecutor)

Responsibilities:

  • Accept a FetchError and Source
  • Query RetryPolicy for decision
  • Update source state atomically (with_lock)
  • Enqueue retry job with correct delay
  • Return whether action taken (success/failure)

Interface:

ruby
result = SourceMonitor::Fetching::RetryOrchestrator.call(
  source:,
  error:,
  decision:,   # RetryPolicy::Decision
  job_class: SourceMonitor::FetchFeedJob,
  now: Time.current
)
# => Result(:retry_enqueued | :circuit_opened | :exhausted, source, error)

Benefit: FetchFeedJob becomes 80 lines instead of 147; retry logic is testable independently.


Priority 2 — Add Result Pattern to Completion Handlers (S4)

For RetentionHandler:

ruby
class RetentionHandler
  Result = Struct.new(:status, :removed_total, :error, keyword_init: true) do
    def success?
      status == :applied
    end
  end

  def call(source:, result:)
    pruner_result = pruner.call(source:, strategy: config.strategy)
    Result.new(status: :applied, removed_total: pruner_result.removed_total)
  rescue StandardError => error
    Rails.logger.error("[SourceMonitor] Retention pruning failed...")
    Result.new(status: :failed, error:)
  end
end

Usage in FetchRunner:

ruby
retention_result = retention_handler.call(source:, result:)
unless retention_result&.success?
  Rails.logger.warn("[SourceMonitor::FetchRunner] Retention failed for source #{source.id}: #{retention_result&.error}")
  # Optionally: broadcast warning to user
end

Same for FollowUpHandler and EventPublisher.


Priority 3 — Standardize Error Logging (S5)

New Constants (top of each handler/job):

ruby
COMPONENT_NAME = "SourceMonitor::FetchFeedJob"
ERROR_PREFIX = "[#{COMPONENT_NAME}]"

def log_error(stage, **payload)
  return unless defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger
  msg = "#{ERROR_PREFIX} #{stage}"
  payload.any? ? Rails.logger.error("#{msg}: #{payload.to_json}") : Rails.logger.error(msg)
rescue StandardError
  nil
end

Apply to:

  1. ScrapeItemJob — change info to warn for errors
  2. RetentionHandler — add component name
  3. FollowUpHandler — use helper instead of string interpolation
  4. Schedulers — add at least warn-level logging on error

Priority 4 — Classify Transient vs Fatal Errors (S6)

New Classification in FaviconFetchJob:

ruby
class FaviconFetchJob
  TRANSIENT_ERRORS = [
    Timeout::Error,
    Errno::ETIMEDOUT,
    Faraday::TimeoutError
  ].freeze

  def perform(source_id)
    # ... existing guards ...
    result = SourceMonitor::Favicons::Discoverer.new(source.website_url).call
    # ...
  rescue *TRANSIENT_ERRORS => error
    # Retry without marking as failed attempt
    log_error("transient_error", error: error.class.name)
    raise  # Let job framework retry
  rescue ActiveRecord::Deadlocked
    raise
  rescue StandardError => error
    # Fatal error — mark failed and don't retry
    record_failed_attempt(source)
    log_error("fatal_error", error: error.class.name)
  end
end

Same for DownloadContentImagesJob:

ruby
image_urls.each do |image_url|
  result = SourceMonitor::Images::Downloader.new(image_url).call
  # ...
rescue Faraday::TimeoutError => error
  # Transient — re-raise to job framework
  Rails.logger.warn("[...] Timeout downloading image, will retry: #{error.message}")
  raise
rescue StandardError => error
  # Fatal (invalid URL, not an image, etc.) — skip
  Rails.logger.warn("[...] Skipping image #{image_url}: #{error.class.name}")
  next  # ← Continue with next image
end

Priority 5 — Extract Scrapers from Jobs (S3)

For ImportOpmlJob:

  • Create ImportSessionImporter service
  • Move lines 50-85 (process_entry logic) into the service
  • Job calls: Service.call(import_session) → returns result

For ImportSessionHealthCheckJob:

  • Create ImportSessionUpdater service
  • Move lines 28-53 (state mutation) into the service
  • Job calls: Service.call(import_session, entry_id, result) → returns result

For ScrapeItemJob:

  • Move rate-limiting logic to Scraping::RateLimiter service
  • Job calls: RateLimiter.check_and_defer_if_needed(item, source) → returns wait_seconds or nil
  • Job calls: ItemScraper.call(item, source) if no deferral

Priority 6 — Add Deadlock Retry to Additional Jobs

Jobs missing deadlock rescue:

  • ScheduleFetchesJob — no database locks, but good practice
  • ItemCleanupJob — uses in_batches which could deadlock
  • LogCleanupJob — direct SQL batches, could deadlock
  • ScrapeItemJob — uses with_lock, should have deadlock rescue

Template:

ruby
rescue_from ActiveRecord::Deadlocked do |error|
  Rails.logger.warn("[SourceMonitor::JobName] Deadlock: #{error.message}")
  retry_job(wait: 2.seconds + rand(3).seconds)
end

Priority 7 — Clean Up Dead Code

Locations:

  1. ImportSessionHealthCheckJob lines 58-59 — unreachable rescue after rescue_from
  2. FaviconFetchJob line 27 — comment "let job framework retry" is clear, but the raise pattern should be consistent

Summary Table

FindingTypeSeverityComponentEffortImpact
S1: Dead rescue codeBugLowImportSessionHealthCheckJobQuickClarity
S2: Retry logic in jobRefactorMediumFetchFeedJobMediumTestability + reusability
S3: Business logic in jobsRefactorMediumImportOpmlJob, ImportSessionHealthCheckJob, ScrapeItemJobHighJob simplification
S4: Missing Result patternsEnhancementMediumRetentionHandler, FollowUpHandler, EventPublisher, SchedulersMediumError propagation + visibility
S5: Inconsistent loggingEnhancementLowMultiple handlers/jobsLowDebuggability
S6: No error classificationEnhancementHighFaviconFetchJob, DownloadContentImagesJobMediumCorrect retry behavior
Additional: Missing deadlock rescueEnhancementLowScheduleFetchesJob, ItemCleanupJob, LogCleanupJob, ScrapeItemJobLowResilience

References

  • Job code: app/jobs/source_monitor/
  • Pipeline code: lib/source_monitor/fetching/, lib/source_monitor/items/, lib/source_monitor/scraping/
  • Error classes: lib/source_monitor/fetching/fetch_error.rb
  • Retry policy: lib/source_monitor/fetching/retry_policy.rb
  • Skills: sm-job, sm-pipeline-stage