packages/sync-service/plans/2026-03-09-issue-3985/plan.md
See analysis.md for detailed root cause analysis.
398caa38a -- Generalize Support.StorageTracer to trace calls from any module4049f75db -- Add a regression test for #3985 (in consumer_test.exs)The regression test is already in place and will be the primary validation for this fix.
File: lib/electric/replication/shape_log_collector/flush_tracker.ex
Merge the two handle_txn_fragment clauses into one:
commit: nil no-op clause (lines 149-155).commit: %Commit{} pattern requirement from the main clause
(line 104). Accept any %TransactionFragment{}.Commit alias (line 3).last_flushed map and
min_incomplete_flush_tree).last_seen_offset when commit != nil.update_global_offset when commit != nil and
last_flushed == %{}.Sketch:
def handle_txn_fragment(
%__MODULE__{
min_incomplete_flush_tree: min_incomplete_flush_tree,
last_flushed: last_flushed
} = state,
%TransactionFragment{last_log_offset: last_log_offset, commit: commit},
affected_shapes
) do
prev_log_offset = %LogOffset{tx_offset: last_log_offset.tx_offset - 1}
{last_flushed, new_shape_ids} =
Enum.reduce(
affected_shapes,
{last_flushed, MapSet.new()},
fn shape, {new_last_flushed, new_shape_ids} ->
case Map.fetch(new_last_flushed, shape) do
{:ok, {_, last_flushed_offset}} ->
{Map.put(new_last_flushed, shape, {last_log_offset, last_flushed_offset}),
new_shape_ids}
:error ->
{Map.put(new_last_flushed, shape, {last_log_offset, prev_log_offset}),
MapSet.put(new_shape_ids, shape)}
end
end
)
min_incomplete_flush_tree =
if MapSet.size(new_shape_ids) == 0,
do: min_incomplete_flush_tree,
else: add_to_tree(min_incomplete_flush_tree, prev_log_offset, new_shape_ids)
state = %__MODULE__{
state
| last_flushed: last_flushed,
min_incomplete_flush_tree: min_incomplete_flush_tree
}
if not is_nil(commit) do
state = %__MODULE__{state | last_seen_offset: last_log_offset}
if last_flushed == %{} do
update_global_offset(state)
else
state
end
else
state
end
end
File: lib/electric/replication/shape_log_collector.ex
In publish/2 (lines 572-581), replace the case event do block with an
unconditional call:
flush_tracker = FlushTracker.handle_txn_fragment(state.flush_tracker, event, affected_shapes)
Steps 1 and 2 can be done in a single commit since the ShapeLogCollector change depends on FlushTracker accepting non-commit fragments.
File: test/electric/replication/shape_log_collector/flush_tracker_test.exs
Update existing tests for the new behavior:
Add new tests:
last_sent progressively, then commit
finalizes and notifies.Run the regression test added in commit 4049f75db:
cd packages/sync-service
mix test test/electric/shapes/consumer_test.exs \
--only "flush notification for multi-fragment txn is not lost"
Run the full FlushTracker test suite:
mix test test/electric/replication/shape_log_collector/flush_tracker_test.exs
Run the full consumer test suite:
mix test test/electric/shapes/consumer_test.exs