.vbw-planning/milestones/07-rails-audit-and-refactoring/04-job-pipeline-reliability/04-01-RESEARCH.md
| Job | Queue | Lines | Purpose | Error Handling |
|---|---|---|---|---|
FetchFeedJob | fetch | 147 | Fetch single source's feed | Robust retry/circuit (S2) |
ScheduleFetchesJob | fetch | 32 | Enqueue due fetches | Basic logging only |
ScrapeItemJob | scrape | 67 | Scrape single item | Basic try/catch + raise |
SourceHealthCheckJob | maintenance | 85 | Health probe on source | Logs errors, records failures |
ImportSessionHealthCheckJob | maintenance | 100 | Health-check import candidates | MISSING deadlock rescue (S1) |
ItemCleanupJob | maintenance | 48 | Prune items by age/count | No error handling |
LogCleanupJob | fetch | 18 | Remove old logs | No error handling (assume none in file) |
ImportOpmlJob | maintenance | 160 | OPML import | Try/catch but accumulates errors in arrays |
FaviconFetchJob | maintenance | 73 | Fetch favicon | Deadlock rescue + broad StandardError |
DownloadContentImagesJob | maintenance | 75 | Download item images | Deadlock rescue in loop, broad catch |
ApplicationJob | N/A | 18 | Base class | None |
Total job code: ~824 lines
| Component | Lines | Error Handling |
|---|---|---|
FetchRunner | 157 | Advisory lock + context manager (try/catch/ensure) |
FeedFetcher | 408 | Typed errors + Result pattern + retry decision |
RetryPolicy | 90 | Decision struct, classifies error types |
AdaptiveInterval | 141 | (interval calculation, no errors) |
SourceUpdater | 200 | (state updates, no error handling) |
EntryProcessor | 89 | (iteration, delegates to ItemCreator) |
ItemCreator | 174 | Result pattern, save! on updates |
RetentionHandler | 30 | Catches StandardError, logs, returns nil |
FollowUpHandler | 43 | Per-item begin/rescue, logs, continues |
EventPublisher | 20 | No error handling (dispatcher handles it) |
Scheduler (fetch) | 86 | No error handling (instrumentation only) |
Scheduler (scrape) | 41 | No error handling |
Enqueuer (scrape) | 163 | Result pattern with statuses, no exceptions |
| Retention Pruner | 147 | Result pattern, no error handling (batches via in_batches) |
Total pipeline code: ~1,786 lines
Location: app/jobs/source_monitor/import_session_health_check_job.rb:4-100
Status: ✓ ALREADY FIXED (line 13-16)
The job DOES include rescue_from ActiveRecord::Deadlocked with a jitter-backoff retry:
rescue_from ActiveRecord::Deadlocked do |error|
Rails.logger&.warn("[SourceMonitor::ImportSessionHealthCheckJob] Deadlock: #{error.message}")
retry_job(wait: 2.seconds + rand(3).seconds)
end
However, there is a POTENTIAL BUG at lines 58-59: the rescue block re-raises ActiveRecord::Deadlocked AFTER being caught by the rescue_from handler. This causes a double-handling:
rescue ActiveRecord::Deadlocked
raise # Line 59: Will be caught again by rescue_from? No - rescue_from catches during initial raise
end
Actually, this is correct: rescue_from intercepts during the initial raise, so lines 58-59 are unreachable code (dead). The real rescue_from at line 13 will handle it.
Recommendation: Remove the unreachable rescue at lines 58-59 to clarify intent.
Location: app/jobs/source_monitor/fetch_feed_job.rb:4-146
Current State: The job contains 147 lines, with ~60 lines of custom retry/circuit-breaker logic:
handle_concurrency_error (lines 35-46) — concurrency backoff + max attemptshandle_transient_error (lines 69-86) — delegates to RetryPolicyenqueue_retry! (lines 89-105) — enqueues delayed retry + updates source stateopen_circuit! (lines 107-119) — opens circuit breaker + updates statereset_retry_state! (lines 121-130) — resets retry counterIssue: This logic mixes job concerns (deserialization + delegation) with domain logic (retry orchestration, circuit-breaker state management).
Current Pattern: RetryPolicy (85 lines) makes decisions; FetchFeedJob executes them. Retry execution touches Source model 3+ times:
source.with_lock do
source.reload
source.update!(fetch_retry_attempt:, next_fetch_at:, ...)
end
Recommendation: Extract to FetchRetryOrchestrator service (or similar) that:
This centralizes retry orchestration outside the job, making it reusable and testable.
Pattern Violations Found:
process_entry contains business logic:
ImportSessionProcessor service that returns a resultImportSessionUpdater serviceScraping::RateLimiter (or similar)Current Pattern: All three jobs do perform shallow delegation to services (ItemScraper, HealthCheckBroadcaster, etc.), but they coordinate too many concerns. Job should be:
def perform(id)
record = Model.find_by(id:)
return unless record
Service.call(record) # ← Only this
end
Instead, they have state mutation, error handling, and re-enqueueing inside the job.
Current State:
| Component | Has Result? | Pattern |
|---|---|---|
EventPublisher | No | Direct dispatch, no return value (20 lines) |
RetentionHandler | No | Catches StandardError, logs, returns nil (30 lines) |
FollowUpHandler | No | Per-item rescue, logs errors, continues (43 lines) |
Scheduler (fetch) | No | No error handling, returns count (86 lines) |
Scheduler (scrape) | No | No error handling, returns count (41 lines) |
ItemCreator | Yes | Result struct with status (:created/:updated/:unchanged) |
RetentionPruner | Yes | Result struct with counts (removed_by_age, removed_by_limit, removed_total) |
Scraping::Enqueuer | Yes | Result struct with status + message |
FeedFetcher | Yes | Result struct with status + error + retry_decision |
ItemScraper | Yes | Result struct with status + item + log + error |
Issue: Completion handlers (RetentionHandler, FollowUpHandler, EventPublisher) and schedulers have no way to signal:
Example Gap — RetentionHandler:
def call(source:, result:)
pruner.call(...)
rescue StandardError => error
Rails.logger.error("[SourceMonitor] Retention pruning failed...")
nil # ← No signal to caller that this failed
end
If retention fails, FetchRunner has no way to know, and the source continues as if pruning succeeded.
Current Logging Patterns:
| Component | Log Level | Format | Consistency |
|---|---|---|---|
FetchFeedJob | error | [SourceMonitor::FetchFeedJob] message | ✓ Consistent |
ImportSessionHealthCheckJob | error | [SourceMonitor::ImportSessionHealthCheckJob] message | ✓ Consistent |
ScrapeItemJob | info | [SourceMonitor::ManualScrape] payload.to_json | ✗ Wrong level (should be error for failures) |
SourceHealthCheckJob | error | [SourceMonitor::SourceHealthCheckJob] message | ✓ Consistent |
RetentionHandler | error | [SourceMonitor] Retention pruning failed... | ✗ Missing job/handler name |
FollowUpHandler | error | [SourceMonitor] FollowUpHandler: failed... | ✗ Inconsistent with other handlers |
FeedFetcher | instrumentation | Struct fields + events | ✓ Structured |
RetryPolicy | none | No logging | ✗ Silent failures possible |
Issues:
info level even for errors (should be warn or error)Location: app/jobs/source_monitor/favicon_fetch_job.rb:26-31 and app/jobs/source_monitor/download_content_images_job.rb:63-68
Current Patterns:
rescue ActiveRecord::Deadlocked
raise # Let job framework retry
rescue StandardError => error
record_failed_attempt(source)
log_error(source, error)
end
Issue: All non-deadlock StandardErrors are treated the same:
SocketTimeout (transient, should retry) → marks as failed + logsInvalidImage (fatal, won't retry) → marks as failed + logsS3WriteFailed (transient) → marks as failed + logsNo distinction between errors that should retry vs. those that won't.
image_urls.each do |image_url|
result = SourceMonitor::Images::Downloader.new(...).call
next unless result
# ... attach blob ...
rescue ActiveRecord::Deadlocked
raise
rescue StandardError
# Individual image failure should not block others
next
end
Issue: The inner rescue catches ALL errors (download, attachment, blob creation) and silently continues. Can't distinguish:
Typed Error Hierarchy:
FetchError (base)
├── TimeoutError (transient → retry 2x, 2min wait, 1hr circuit)
├── ConnectionError (transient → retry 3x, 5min wait, 1hr circuit)
├── HTTPError (branching)
│ ├── 429 (transient → retry 2x, 15min wait, 90min circuit)
│ ├── 5xx (transient → retry 2x, 10min wait, 90min circuit)
│ └── 4xx (fatal → retry 1x, 45min wait, 2hr circuit)
├── ParsingError (fatal → retry 1x, 30min wait, 2hr circuit)
├── BlockedError (fatal → retry 1x, 1hr wait, 4hr circuit)
├── AuthenticationError (fatal → retry 1x, 1hr wait, 4hr circuit)
└── UnexpectedResponseError (unknown → retry 1x, 30min wait, 2hr circuit)
RetryPolicy Decision Struct (lines 8-15):
Decision = Struct.new(
:retry?, # boolean
:wait, # duration
:next_attempt, # counter
:open_circuit?, # boolean
:circuit_until, # timestamp
keyword_init: true
)
Usage in FetchFeedJob (lines 72-86):
decision = SourceMonitor::Fetching::RetryPolicy.new(source:, error:, now: Time.current).decision
return raise error unless decision
if decision.retry?
enqueue_retry!(source, decision) # ← Service needed here
elsif decision.open_circuit?
open_circuit!(source, decision) # ← Service needed here
else
reset_retry_state!(source) # ← Service needed here
raise error
end
Applied in 6 components:
FeedFetcher::Result (8 fields: status, feed, response, body, error, item_processing, retry_decision)
ItemCreator::Result (3 fields: item, status, matched_by)
RetentionPruner::Result (3 fields: removed_by_age, removed_by_limit, removed_total)
Scraping::Enqueuer::Result (3 fields + 4 status helper methods: enqueued?, already_enqueued?, deferred?, failure?)
ItemScraper::Result (5 fields: status, item, log, message, error)
FeedFetcher::EntryProcessingResult (8 fields: created, updated, unchanged, failed, items, errors, created_items, updated_items)
Pipeline (FetchRunner#run, lines 53-83):
1. AdvisoryLock.with_lock do
2. FeedFetcher.call # → Result
3. RetentionHandler.call(result) # → nil (no signal)
4. FollowUpHandler.call(result) # → nil (no signal)
5. schedule_retry_if_needed(result) # Uses Result.retry_decision
6. mark_complete!(result) # Uses Result.status
7. end
8. EventPublisher.call(result) # → nil (no signal)
Handler Patterns:
| Handler | Success Path | Failure Path |
|---|---|---|
| RetentionHandler | Returns nil | Logs error, returns nil |
| FollowUpHandler | Enqueues jobs for created items | Per-item rescue, logs, continues |
| EventPublisher | Dispatches event | No error handling (assumed in dispatcher) |
Issue: If RetentionHandler or FollowUpHandler fail, FetchRunner doesn't know. Execution continues as if the handler succeeded.
Consistent Format (when used):
# Pattern 1: Structured JSON (ScrapeItemJob, Scraping::Enqueuer)
payload = {
stage: "SourceMonitor::ScrapeItemJob#job:start",
item_id: item&.id,
source_id: item&.source_id,
**extra
}.compact
Rails.logger.info("[SourceMonitor::ManualScrape] #{payload.to_json}")
# Pattern 2: Message string (FetchFeedJob, ImportSessionHealthCheckJob)
message = "[SourceMonitor::FetchFeedJob] Fetch already in progress for source #{@source_id}"
Rails.logger.info(message)
# Pattern 3: Error logging (RetentionHandler, FollowUpHandler)
Rails.logger.error("[SourceMonitor] Retention pruning failed for source #{source.id}: #{error.class} - #{error.message}")
Common Guard (all jobs):
return unless defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger
Name: SourceMonitor::Fetching::RetryOrchestrator (or RetryDecisionExecutor)
Responsibilities:
Interface:
result = SourceMonitor::Fetching::RetryOrchestrator.call(
source:,
error:,
decision:, # RetryPolicy::Decision
job_class: SourceMonitor::FetchFeedJob,
now: Time.current
)
# => Result(:retry_enqueued | :circuit_opened | :exhausted, source, error)
Benefit: FetchFeedJob becomes 80 lines instead of 147; retry logic is testable independently.
For RetentionHandler:
class RetentionHandler
Result = Struct.new(:status, :removed_total, :error, keyword_init: true) do
def success?
status == :applied
end
end
def call(source:, result:)
pruner_result = pruner.call(source:, strategy: config.strategy)
Result.new(status: :applied, removed_total: pruner_result.removed_total)
rescue StandardError => error
Rails.logger.error("[SourceMonitor] Retention pruning failed...")
Result.new(status: :failed, error:)
end
end
Usage in FetchRunner:
retention_result = retention_handler.call(source:, result:)
unless retention_result&.success?
Rails.logger.warn("[SourceMonitor::FetchRunner] Retention failed for source #{source.id}: #{retention_result&.error}")
# Optionally: broadcast warning to user
end
Same for FollowUpHandler and EventPublisher.
New Constants (top of each handler/job):
COMPONENT_NAME = "SourceMonitor::FetchFeedJob"
ERROR_PREFIX = "[#{COMPONENT_NAME}]"
def log_error(stage, **payload)
return unless defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger
msg = "#{ERROR_PREFIX} #{stage}"
payload.any? ? Rails.logger.error("#{msg}: #{payload.to_json}") : Rails.logger.error(msg)
rescue StandardError
nil
end
Apply to:
info to warn for errorswarn-level logging on errorNew Classification in FaviconFetchJob:
class FaviconFetchJob
TRANSIENT_ERRORS = [
Timeout::Error,
Errno::ETIMEDOUT,
Faraday::TimeoutError
].freeze
def perform(source_id)
# ... existing guards ...
result = SourceMonitor::Favicons::Discoverer.new(source.website_url).call
# ...
rescue *TRANSIENT_ERRORS => error
# Retry without marking as failed attempt
log_error("transient_error", error: error.class.name)
raise # Let job framework retry
rescue ActiveRecord::Deadlocked
raise
rescue StandardError => error
# Fatal error — mark failed and don't retry
record_failed_attempt(source)
log_error("fatal_error", error: error.class.name)
end
end
Same for DownloadContentImagesJob:
image_urls.each do |image_url|
result = SourceMonitor::Images::Downloader.new(image_url).call
# ...
rescue Faraday::TimeoutError => error
# Transient — re-raise to job framework
Rails.logger.warn("[...] Timeout downloading image, will retry: #{error.message}")
raise
rescue StandardError => error
# Fatal (invalid URL, not an image, etc.) — skip
Rails.logger.warn("[...] Skipping image #{image_url}: #{error.class.name}")
next # ← Continue with next image
end
For ImportOpmlJob:
ImportSessionImporter serviceService.call(import_session) → returns resultFor ImportSessionHealthCheckJob:
ImportSessionUpdater serviceService.call(import_session, entry_id, result) → returns resultFor ScrapeItemJob:
Scraping::RateLimiter serviceRateLimiter.check_and_defer_if_needed(item, source) → returns wait_seconds or nilItemScraper.call(item, source) if no deferralJobs missing deadlock rescue:
ScheduleFetchesJob — no database locks, but good practiceItemCleanupJob — uses in_batches which could deadlockLogCleanupJob — direct SQL batches, could deadlockScrapeItemJob — uses with_lock, should have deadlock rescueTemplate:
rescue_from ActiveRecord::Deadlocked do |error|
Rails.logger.warn("[SourceMonitor::JobName] Deadlock: #{error.message}")
retry_job(wait: 2.seconds + rand(3).seconds)
end
Locations:
| Finding | Type | Severity | Component | Effort | Impact |
|---|---|---|---|---|---|
| S1: Dead rescue code | Bug | Low | ImportSessionHealthCheckJob | Quick | Clarity |
| S2: Retry logic in job | Refactor | Medium | FetchFeedJob | Medium | Testability + reusability |
| S3: Business logic in jobs | Refactor | Medium | ImportOpmlJob, ImportSessionHealthCheckJob, ScrapeItemJob | High | Job simplification |
| S4: Missing Result patterns | Enhancement | Medium | RetentionHandler, FollowUpHandler, EventPublisher, Schedulers | Medium | Error propagation + visibility |
| S5: Inconsistent logging | Enhancement | Low | Multiple handlers/jobs | Low | Debuggability |
| S6: No error classification | Enhancement | High | FaviconFetchJob, DownloadContentImagesJob | Medium | Correct retry behavior |
| Additional: Missing deadlock rescue | Enhancement | Low | ScheduleFetchesJob, ItemCleanupJob, LogCleanupJob, ScrapeItemJob | Low | Resilience |
app/jobs/source_monitor/lib/source_monitor/fetching/, lib/source_monitor/items/, lib/source_monitor/scraping/lib/source_monitor/fetching/fetch_error.rblib/source_monitor/fetching/retry_policy.rbsm-job, sm-pipeline-stage