Data Ingestion & File Processing
Build workflows that process file uploads, ingest events, and handle incoming data with built-in durability.
Many teams use workflows to process incoming data: file uploads, event streams, emails, and more. Workflows make this durable - if processing fails halfway through a 10,000-row CSV import, it picks up where it left off rather than restarting from scratch.
This guide covers the most common data ingestion patterns, starting simple and building up to production-ready examples.
File Upload Processing
The simplest data ingestion pattern receives a file, validates it, processes it, and stores the result. Each operation is a step, so a failure in any stage retries only that stage.
import { getStepMetadata } from "workflow";
type FileResult = {
rowCount: number;
errors: string[];
};
async function validateFile(fileUrl: string) {
"use step";
const response = await fetch(fileUrl);
const content = await response.text();
const rows = content.split("\n").filter(Boolean);
if (rows.length === 0) {
throw new Error("File is empty");
}
return { content, rowCount: rows.length };
}
async function transformRows(content: string) {
"use step";
const rows = content.split("\n").filter(Boolean);
const transformed = rows.map((row) => {
const fields = row.split(",");
return fields.map((f) => f.trim().toLowerCase()).join(",");
});
return transformed;
}
async function storeResults(rows: string[]) {
"use step";
const { stepId } = getStepMetadata();
// Write to your database or storage service
await fetch("https://api.example.com/import", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Idempotency-Key": stepId,
},
body: JSON.stringify({ rows }),
});
return { rowCount: rows.length };
}
export async function processFileWorkflow(fileUrl: string) {
"use workflow";
const { content } = await validateFile(fileUrl);
const transformed = await transformRows(content);
const result = await storeResults(transformed);
return result;
}If storeResults fails after validateFile and transformRows succeed, only the store step retries. The validated and transformed data is already recorded in the event log.
Streaming Large Files
For files too large to hold in memory, pass streams between steps instead of loading the full content. See the streaming foundation guide for more on how streams work.
export async function processLargeFileWorkflow(fileUrl: string) {
"use workflow";
const rawStream = await downloadFile(fileUrl);
const processed = await transformStream(rawStream);
await uploadResult(processed);
}
async function downloadFile(
url: string
): Promise<ReadableStream<Uint8Array>> {
"use step";
const response = await fetch(url);
return response.body!;
}
async function transformStream(
input: ReadableStream<Uint8Array>
): Promise<ReadableStream<Uint8Array>> {
"use step";
return input.pipeThrough(
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
// Process each chunk without loading the full file
controller.enqueue(chunk);
},
})
);
}
async function uploadResult(stream: ReadableStream<Uint8Array>) {
"use step";
await fetch("https://storage.example.com/upload", {
method: "POST",
body: stream,
});
}Event Ingestion Pipeline
Use createWebhook() to receive events from external systems. The workflow stays alive, waiting for each event and processing it durably.
import { createWebhook } from "workflow";
type IngestEvent = {
source: string;
type: string;
payload: Record<string, unknown>;
};
async function enrichEvent(event: IngestEvent) {
"use step";
const enriched = {
...event,
receivedAt: new Date().toISOString(),
normalized: true,
};
return enriched;
}
async function writeToStore(event: Record<string, unknown>) {
"use step";
await fetch("https://api.example.com/events", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
}
export async function ingestEventsWorkflow() {
"use workflow";
const webhook = createWebhook<IngestEvent>();
for await (const request of webhook) {
const event = await request.json();
const enriched = await enrichEvent(event);
await writeToStore(enriched);
}
}The webhook is created once and iterated with for await. Each incoming HTTP request resumes the workflow, and the event is processed as a pair of durable steps. If enrichment succeeds but writing fails, only the write retries - the enriched data is preserved.
Webhooks implement AsyncIterable, so a single webhook can receive many requests over time. Use createHook() instead when you have your own ingestion layer that calls resumeHook() directly rather than sending HTTP requests.
Email Processing
Email processing is a natural fit for workflows. Parse the email in one step, extract structured data in another, and route based on content. Each step retries independently if it fails.
type ParsedEmail = {
from: string;
subject: string;
body: string;
hasAttachment: boolean;
};
async function parseEmail(rawEmail: string): Promise<ParsedEmail> {
"use step";
// Parse raw email into structured data
const lines = rawEmail.split("\n");
const from = lines.find((l) => l.startsWith("From:"))?.slice(5).trim() ?? "";
const subject =
lines.find((l) => l.startsWith("Subject:"))?.slice(8).trim() ?? "";
const bodyStart = lines.indexOf("") + 1;
const body = lines.slice(bodyStart).join("\n");
return {
from,
subject,
body,
hasAttachment: rawEmail.includes("Content-Disposition: attachment"),
};
}
async function classifyEmail(
parsed: ParsedEmail
): Promise<"support" | "billing" | "general"> {
"use step";
// Classify based on subject and body content
const text = `${parsed.subject} ${parsed.body}`.toLowerCase();
if (text.includes("invoice") || text.includes("payment")) return "billing";
if (text.includes("help") || text.includes("issue")) return "support";
return "general";
}
async function routeEmail(parsed: ParsedEmail, category: string) {
"use step";
await fetch("https://api.example.com/tickets", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
from: parsed.from,
subject: parsed.subject,
body: parsed.body,
category,
}),
});
}
export async function processEmailWorkflow(rawEmail: string) {
"use workflow";
const parsed = await parseEmail(rawEmail);
const category = await classifyEmail(parsed);
await routeEmail(parsed, category);
return { from: parsed.from, category };
}Resilient Data Extraction
When extracting data from external APIs, rate limits and transient failures are common. Use RetryableError to customize retry delays and FatalError to stop retrying on permanent failures.
import { FatalError, RetryableError, getStepMetadata } from "workflow";
type PriceData = {
symbol: string;
price: number;
timestamp: string;
};
async function fetchPrice(symbol: string): Promise<PriceData> {
"use step";
const metadata = getStepMetadata();
const response = await fetch(
`https://api.example.com/prices/${symbol}`
);
if (response.status === 429) {
// Rate limited - back off exponentially
throw new RetryableError("Rate limited", {
retryAfter: metadata.attempt ** 2 * 1000,
});
}
if (response.status === 404) {
// Symbol doesn't exist - no point retrying
throw new FatalError(`Unknown symbol: ${symbol}`);
}
if (!response.ok) {
// Server error - default retry behavior
throw new Error(`API error: ${response.status}`);
}
return response.json() as Promise<PriceData>;
}
async function storePrices(prices: PriceData[]) {
"use step";
await fetch("https://api.example.com/prices/bulk", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prices }),
});
}
export async function extractPricesWorkflow(symbols: string[]) {
"use workflow";
const prices: PriceData[] = [];
for (const symbol of symbols) {
const price = await fetchPrice(symbol);
prices.push(price);
}
await storePrices(prices);
return { count: prices.length };
}Each fetchPrice call is an independent step. If the API rate-limits the third symbol, the first two are already recorded and won't re-execute. The third step retries with exponential backoff until it succeeds or exhausts its retry budget.
The default retry limit is 3. Set fetchPrice.maxRetries = 10 after the function declaration to allow more attempts for flaky APIs. See Errors and Retries for the full retry API.
Batch Processing with Controlled Concurrency
When processing a large array of items, you may want to limit how many run concurrently to avoid overwhelming downstream services. Chunk the array and process each batch with Promise.all.
declare function processRecord(record: string): Promise<{ id: string }>; // @setup
export async function batchImportWorkflow(records: string[]) {
"use workflow";
const batchSize = 5;
const results: Array<{ id: string }> = [];
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
const batchResults = await Promise.all(
batch.map((record) => processRecord(record))
);
results.push(...batchResults);
}
return { processed: results.length };
}Each batch of 5 items runs in parallel. The next batch starts only after the current one completes. If any step in a batch fails, it retries independently without affecting the other items in the batch.
This pattern works well for database imports, API migrations, and any scenario where you need throughput without overwhelming a target system.
Related Documentation
- Streaming - Stream data between steps and to clients
- Errors and Retries - Customize retry behavior with
FatalErrorandRetryableError - Common Patterns - Sequential, parallel, timeout, and composition patterns
createWebhook()API Reference - Receive external events in workflowsgetStepMetadata()API Reference - Access step metadata including attempt count- Serialization - Understand which types can be passed between functions