Back to Sanity

RxJS Like a Pro

.agents/skills/rxjs-like-a-pro/SKILL.md

5.24.011.3 KB
Original Source

RxJS Like a Pro

This skill helps you write RxJS code that is idiomatic, composable, and free of common pitfalls. The core philosophy: keep logic in the observable chain. Every time you reach for .subscribe(), ask whether the work could instead be expressed as a transformation inside .pipe().

Reference files

For detailed examples and patterns, read the relevant reference file:

  • references/loading-state-patterns.md — Deriving loading/error state in the chain, the withLoadingState custom operator, and using scan to preserve previous results across loading states. Read when working with async data fetching that needs loading indicators.
  • references/massive-observable.md — How to refactor bloated new Observable() constructors into small focused pieces. Read when you see a new Observable callback longer than ~10 lines.
  • references/inner-observable-chains.md — Building rich inner observable sequences with timing, delays, and animation phases. Read when composing multi-step async sequences or replacing setTimeout patterns.
  • references/custom-operators.md — How to write inline and extracted custom operators with OperatorFunction. Read when extracting reusable stream logic.

The #1 Anti-pattern: Premature Subscribe

The most common RxJS mistake is subscribing too early and then doing imperative work inside the callback — tracking state in variables, calling functions with side effects, or worse, subscribing to another observable inside the callback (the "subscribe-in-subscribe" pattern).

Why this matters: when you subscribe early, you lose the power of the reactive chain. You can no longer compose, retry, cancel, debounce, or share that work. You've escaped from the declarative world into imperative spaghetti, and every new requirement (add a retry, add a timeout, combine with another stream) means more manual state management.

typescript
// ❌ Bad: subscribe-in-subscribe with manual state tracking
let currentData: Data | null = null
let loading = false

input$.subscribe((value) => {
  loading = true
  fetchData(value).subscribe((data) => {
    currentData = data
    loading = false
  })
})

// ✅ Good: everything is in the chain
const data$ = input$.pipe(switchMap((value) => fetchData(value)))

For loading state, derive it inside the chain using startWith — see references/loading-state-patterns.md.

The Massive new Observable() Antipattern

Another common antipattern is stuffing an entire program into a single new Observable(subscriber => { ... }) constructor — setting up listeners, resolving promises, subscribing to other observables, managing retry state, all in one giant callback. This is imperative code wearing an Observable costume.

The new Observable() constructor should be small and focused — a thin bridge from one non-reactive source into the reactive world. For promise-based sources, use defer(() => promise) instead. Retry logic, error handling, combining sources — all of that belongs in the operator chain.

See references/massive-observable.md for a full before/after example.

Choosing the Right Flattening Operator

OperatorBehaviorUse when
switchMapCancels previous inner when new value arrivesUser input, search-as-you-type, route changes — only the latest matters
mergeMapRuns all inner observables concurrentlyIndependent operations where all results are needed (logging, fire-and-forget)
concatMapQueues inner observables, runs in orderOrder matters and nothing should be dropped (sequential writes, queues)
exhaustMapIgnores new values while inner is runningPreventing duplicate submissions (form submit clicks)

Default to switchMap for most UI/request scenarios.

The inner observable doesn't have to be a single request — it can be an entire timeline of events using concat, merge, timer, delay. See references/inner-observable-chains.md for animation and timing examples.

Error Handling

Put catchError on the inner observable when you want the outer stream to keep running. Put it on the outer stream only when you truly want to replace the entire stream on error:

typescript
// ❌ Bad: catchError on outer stream kills it for good
source$.pipe(
  switchMap((value) => fetchData(value)),
  catchError((err) => of(fallback)),
)

// ✅ Good: catchError inside switchMap — outer stream survives
source$.pipe(switchMap((value) => fetchData(value).pipe(catchError((err) => of(fallback)))))

Same principle applies to retry — retry the inner operation, not the entire outer stream:

typescript
source$.pipe(
  switchMap((value) =>
    fetchData(value).pipe(
      retry({count: 3, delay: 1000}),
      catchError((err) => of(fallback)),
    ),
  ),
)

Avoiding Memory Leaks

The fewer manual subscriptions, the fewer chances to leak. In order of preference:

  1. Don't subscribe at all — let the framework handle subscription lifecycle where possible
  2. Use operators that complete naturallyfirst(), take(n), takeUntil(destroy$)
  3. Use takeUntil with a notifier:
typescript
const destroy$ = new Subject<void>();
someObservable$.pipe(
  takeUntil(destroy$),
).subscribe(value => /* ... */);

// In teardown: destroy$.next(); destroy$.complete();

takeUntil must be the last operator in the pipe. Operators after it (especially flattening operators) can create inner subscriptions that takeUntil doesn't know about, causing leaks.

  1. Compose into a single subscription — if you have multiple independent streams with side effects, merge them into one and subscribe once.

Hot vs Cold

  • Cold observables (new Observable(...), of(), HTTP requests) create a new execution per subscriber
  • Hot observables (Subject, fromEvent) share a single execution

Share cold observables with shareReplay({ bufferSize: 1, refCount: true }). Always use refCount: true — without it, the source subscription stays alive after all subscribers unsubscribe (memory leak).

Deriving State Reactively

Instead of mutable variables updated from multiple subscriptions, derive state from streams:

typescript
// ❌ Bad: mutable state, inconsistent windows
let items: Item[] = []
let filter = ''
items$.subscribe((i) => {
  items = i
  recompute()
})
filter$.subscribe((f) => {
  filter = f
  recompute()
})

// ✅ Good: always consistent
const filteredItems$ = combineLatest([items$, filter$]).pipe(
  map(([items, filter]) => items.filter((item) => item.name.includes(filter))),
)

combineLatest vs withLatestFrom: combineLatest emits when any input emits (all inputs drive output). withLatestFrom emits only when the source emits (one driver, others are context).

startWith: combineLatest won't emit until every input has emitted at least once. Use startWith to provide initial values and unblock the stream.

Subjects: Use Sparingly

Subject, BehaviorSubject, ReplaySubject are escape hatches for bridging imperative and reactive code. Appropriate for event buses and bridging callbacks. Not appropriate as general-purpose state containers — if you're calling .next() in multiple places to keep a Subject in sync, use a derived stream instead.

Custom Operators

Don't be afraid to write them — they're just functions with the signature (source: Observable<A>) => Observable<B>. Extract repeated .pipe() chains into named operators with OperatorFunction<In, Out>. See references/custom-operators.md for inline and extracted examples.

Side Effects Belong in tap, Not in subscribe

A good rule of thumb: .subscribe() should have no arguments. All side effects — logging, updating the DOM, writing to a database, sending analytics — belong in tap inside the chain. The .subscribe() at the end just activates the stream.

typescript
// ❌ Bad: side effects crammed into subscribe
source$.pipe(switchMap((value) => fetchData(value))).subscribe(
  (data) => {
    updateUI(data)
    logAnalytics('data_loaded', data)
    cache.set(data)
  },
  (err) => showError(err),
)

// ✅ Good: side effects in tap, subscribe just activates
source$
  .pipe(
    switchMap((value) => fetchData(value)),
    tap((data) => updateUI(data)),
    tap((data) => logAnalytics('data_loaded', data)),
    tap((data) => cache.set(data)),
    tap({error: (err) => showError(err)}),
  )
  .subscribe()

Why this matters: when side effects are in the chain, they're composable. You can add, remove, or reorder them. You can put a filter between them. You can share the stream and have different subscribers without duplicating side-effect logic. When everything is stuffed into .subscribe(), you've lost all of that.

tap also accepts an observer object with lifecycle hooks — particularly useful for debugging:

typescript
source$.pipe(
  tap({
    subscribe: () => console.log('subscribed!'),
    next: (value) => console.log('value:', value),
    error: (err) => console.log('error:', err),
    complete: () => console.log('complete'),
    unsubscribe: () => console.log('unsubscribed'),
    finalize: () => console.log('finalized (complete or unsubscribe)'),
  }),
)

The subscribe hook is especially handy for debugging "why isn't my stream emitting?" — it confirms whether anything is actually subscribing.

Avoid Unnecessary Promise Conversion

firstValueFrom/lastValueFrom are appropriate for one-shot interop with promise-based APIs. They're a code smell when used inside subscribe callbacks to avoid learning the reactive approach — that work belongs in the chain with switchMap.

Quick Reference: Common Refactoring Patterns

Anti-patternRefactoring
a$.subscribe(x => b$.subscribe(y => ...))a$.pipe(switchMap(x => b$)) (or mergeMap/concatMap/exhaustMap)
Mutable variable updated in subscribescan() or combineLatest to derive state
setTimeout inside subscribedelay(), timer(), or debounceTime()
if guard in subscribe to skip valuesfilter() before subscribe
try/catch inside subscribecatchError() in the pipe
Manual request cancellation flagsswitchMap (auto-cancels previous)
Multiple subscribes to same cold observableshareReplay({ bufferSize: 1, refCount: true })
.subscribe() just to trigger side effectstap() for side effects, keep the chain going
Massive new Observable() constructorSmall focused constructors + defer() + operator composition
await firstValueFrom() inside subscribeswitchMap — stay in the chain