.agents/skills/upstash-workflow/references/best-practices.md
Apply these once your scaffold from implementation.md is in place.
export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
{ flowControl: { ... } },
);
Consistent prefixes make debugging much easier across QStash dashboards and grep:
console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);
Pick the shape that matches the layer's purpose — entry points return statistics, execution layers return per-item results.
// Success
return { success: true, itemId, result, message: 'Optional success message' };
// Error
return { success: false, error: 'Error description', itemId };
// Statistics (entry point)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // if applicable
message: 'Summary message',
};
Tune concurrency by layer — entry points are singletons, execution layers fan out.
// Layer 1: Entry — single instance to avoid duplicate processing
flowControl: { key: '{workflow}.process', parallelism: 1, ratePerSecond: 1 }
// Layer 2: Pagination — moderate concurrency
flowControl: { key: '{workflow}.paginate', parallelism: 20, ratePerSecond: 5 }
// Layer 3: Execution — higher concurrency for parallel item work
flowControl: { key: '{workflow}.execute', parallelism: 10, ratePerSecond: 5 }
Why these defaults:
parallelism: 1 so concurrent triggers don't both start the same batch.{workflow}:step-namecontext.run() calls — keep them flat// ✅ Unique step names
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// ❌ Same step name — Upstash de-dupes by step name and you'll lose data
await Promise.all(items.map((item) => context.run(`{workflow}:execute`, () => processItem(item))));
Validate at the top so failures are explicit, not silent undefined cascades:
export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
if (!itemId) return { success: false, error: 'Missing itemId in payload' };
if (!configId) return { success: false, error: 'Missing configId in payload' };
// Proceed with work...
},
{ flowControl: { ... } },
);
Get the connection once per workflow — getServerDB() is async, repeating it inside each step adds latency:
export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB();
const item = await context.run('get-item', () => itemModel.findById(db, itemId));
const result = await context.run('save-result', () => resultModel.create(db, result));
},
{ flowControl: { ... } },
);
Integration tests should exercise both the dry-run statistics path and the full execution path:
describe('WorkflowName', () => {
it('should process items successfully', async () => {
const items = await createTestItems();
await WorkflowClass.triggerProcessItems({ dryRun: false });
await waitForCompletion();
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});
context.run() step names// Bad — Upstash dedupes by step name
await Promise.all(items.map((item) => context.run('process', () => process(item))));
// Good
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));
// Bad — undefined cascades into a confusing failure later
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId);
// Good — fail fast with a clear error
if (!itemId) return { success: false, error: 'Missing itemId' };
// Bad — duplicates work for items that were already processed
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));
// Good — keeps the pipeline idempotent
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));
// Bad — different prefixes, mixed formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);
// Good — uniform prefix lets you grep by workflow+layer
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });