Pipelines API
Compose and run DAGs of scraping, enrichment, and transformation steps under /api/pipelines.
Pipelines API
A pipeline is a directed acyclic graph (DAG) of nodes that chains scraping, enrichment, and transformation steps. Submitting a pipeline starts the root jobs synchronously; downstream jobs are spawned as each predecessor reaches done.
All endpoints live under /api/pipelines and require an authenticated session. Mutating routes additionally require an active (non-suspended) account. Generic errors: 401 no session, 403 not owner / account suspended, 404 pipeline or node id unknown — endpoint-specific causes are listed inline.
See also: Pipeline orchestration and Filter module.
Graph shape
The definition document describes the DAG. Edges are explicit and reference node ids; they are not inferred from any per-node inputs field.
{
"nodes": [
{"id": "n1", "type": "scrap", "config": {"queries": ["dentist"], "zones": ["Paris"]}, "x": 100, "y": 100},
{"id": "n2", "type": "emails", "config": {"mode": "normal"}, "x": 320, "y": 100},
{"id": "n3", "type": "verify", "config": {}, "x": 540, "y": 100}
],
"edges": [
{"id": "e1", "from": "n1", "to": "n2"},
{"id": "e2", "from": "n2", "to": "n3"}
]
}
Node types
| Type | Role | Accepts (input) | Produces (output) |
|---|---|---|---|
scrap |
Google Maps scrape (root) | none | pois |
import |
CSV/Sheets import (root) | none | pois |
reviews |
Fetch reviews for POIs | pois_any |
reviews |
emails |
Discover emails from websites | pois_any |
pois_email |
verify |
SMTP verify emails | pois_email |
verified |
socials |
Discover social profiles | pois_any |
pois |
dead_check |
Detect inactive POIs | pois_any |
pois |
techstack |
Detect website tech stack | pois_any |
pois |
ads_intelligence |
Detect ad campaigns | pois_any |
pois |
brand_assets |
Extract logo and brand assets | pois_any |
pois |
legal_ids |
Find SIRET/SIREN from website | pois_any |
pois |
legal_data |
Company data via api.gouv.fr | pois_any |
pois |
legal_mentions |
Parse legal mentions page | pois_any |
pois |
phones_extra |
Find extra phone numbers | pois_any |
pois |
pricing |
Extract pricing/tariffs | pois_any |
pois |
pagespeed |
Google PageSpeed scoring | pois_any |
pois |
phone_info |
Phone line type / carrier (cache) | pois_any |
pois |
filter |
Apply rule-based row filter | any_pois |
passthrough |
sort |
Reorder rows by a column | any_pois |
passthrough |
filter and sort preserve the upstream type; type compatibility is resolved by walking back to the nearest non-passthrough ancestor.
Column guarantee
No module ever drops a column. Every enrichment and check node outputs all the columns it received — in their original order — plus its own columns appended at the end. This holds across an entire chain: a scrap → legal_ids → legal_data → emails pipeline ends with the Google Maps columns (lien_google_maps, note, nb_avis, lat, lon, …), the identifiers, the legal profile, and the emails side by side. Custom columns from an import are passed through untouched as well. The only way a column disappears is an explicit transformation the pipeline asks for (a filter rule, a sort top_n cut applies to rows, never columns).
The full machine-readable contract for every node —
category,input/output, theneeds/producescolumns, and each block'sconfig_schema— is served live atGET /api/pipelines/schema. That endpoint is the single source of truth; this table is a summary.
Validation rules
The server rejects a definition with HTTP 400 if any of the following hold:
| Rule | Error message |
|---|---|
Empty nodes list |
Pipeline vide |
| More than 20 nodes | Trop de nodes (max 20) |
| Duplicate node id | IDs de nodes en doublon |
Unknown type |
Type de node inconnu : ... |
| Edge endpoint references missing node | Edge référence un node inexistant |
Self-loop (from == to) |
Edge vers soi-même interdit |
| Root type connected as a successor | Le node '...' ne peut pas avoir de prédécesseur |
| Incompatible output to input | Connexion X → Y incompatible |
| Node with more than one predecessor (MVP limit) | Le node ... a plusieurs prédécesseurs |
Missing required config field |
Node '...' : champ requis manquant « ... » |
| Wrong config field type / bad enum value | Node '...', champ « ... » : ... |
Roots must be one of scrap or import. Fan-out (one node feeding several successors) is allowed; fan-in is not.
Portable envelope
Pipelines export and import as a single self-describing JSON envelope. The same shape is produced by the editor's Export button and accepted by Import and AI generation:
{
"schema_version": 1,
"name": "Dentists Paris",
"definition": { "nodes": [...], "edges": [...] },
"meta": { "exported_from": "outsend.xyz", "kind": "pipeline" }
}
POST /api/pipelines and POST /api/pipelines/validate accept either the full envelope or a bare definition. Before validation the server normalizes the definition: it generates missing edge ids, auto-lays out nodes that have no x/y, applies config_schema defaults, coerces newline-separated strings into string[] fields, and strips config fields not in the schema. This means a minimal hand-written or AI-generated {nodes, edges} (no coordinates, no edge ids) is accepted as-is.
GET /api/pipelines/schema
Return the canonical, machine-readable pipeline schema — the single source of truth shared by the editor, import, AI generation, and the planned MCP create_pipeline tool. Public (no auth): it is format documentation, identical for every caller.
Response 200
{
"schema_version": 1,
"compat": { "pois_any": ["pois", "pois_email"], "pois_email": ["pois_email"], "any_pois": ["pois", "pois_email", "verified"] },
"root_types": ["import", "scrap"],
"nodes": {
"scrap": {
"category": "source", "is_root": true,
"input": null, "output": "pois",
"needs": [], "produces": ["nom", "site_web", "telephone", "..."],
"config_schema": {
"queries": {"type": "string[]", "required": true, "label": "..."},
"zones": {"type": "string[]", "required": true, "label": "..."}
}
},
"emails": { "category": "enrich", "input": "pois_any", "output": "pois_email",
"needs": ["site_web"], "produces": ["email", "email_personal"],
"config_schema": {"mode": {"type": "enum", "enum": ["normal", "deep"], "default": "normal"}} }
}
}
config_schema field types: string, string[], int, float, bool, enum (with enum list), object. required and default are optional per field.
POST /api/pipelines/validate
Normalize and validate a definition without creating or running anything. Used by Import (review before launch) and AI generation (check the JSON Claude produced). Requires a session.
Request body
{ "definition": { "nodes": [...], "edges": [...] }, "schema_version": 1 }
Response 200 — valid
{
"ok": true,
"definition": { "nodes": [...with ids, x/y, defaults...], "edges": [...] },
"summary": { "n_nodes": 3, "n_edges": 2, "types": ["scrap", "emails", "verify"] }
}
Response 200 — invalid (note: still HTTP 200, with ok: false)
{ "ok": false, "error": "Connexion scrap → verify incompatible" }
The returned definition is the normalized form, ready to load into the editor or submit verbatim to POST /api/pipelines.
Generate a pipeline with any AI
You don't need to write the JSON by hand. Two ways:
1. Built-in (inside outsend). The editor's 🤖 Build with AI button sends the schema above plus your plain-language description to Claude using your own key (BYOK), then validates and lays out the result.
2. Bring-your-own assistant (copy/paste anywhere). Open any AI assistant — claude.ai, Claude Desktop, Cursor, ChatGPT — and:
- Paste the contract: either this page, or the whole docs bundle at
/docs/llms-full.txt, or just the JSON ofGET /api/pipelines/schema. - Add your request, e.g. "Compose an outsend pipeline that finds dentists in Berlin, gets their emails, verifies deliverability, and keeps the top-rated. Return only the JSON envelope."
- The assistant returns a
{schema_version, name, definition}envelope. Paste it into the editor's Import dialog (it is validated before anything runs), orPOSTit to/api/pipelines.
Because the editor, import, and this API all accept the same envelope and the server normalizes it (missing edge ids, coordinates, and config defaults are filled in), a hand-assembled {nodes, edges} works without any layout fields.
POST /api/pipelines
Create a pipeline and launch its root jobs.
Request body
{
"name": "Dentists Paris",
"definition": { "nodes": [...], "edges": [...] }
}
name is optional (≤ 120 chars, defaults to "Pipeline"). The definition is normalized (see Portable envelope) before validation, so a minimal {nodes, edges} works.
Response 201
{
"id": "f1a2…-uuid",
"status": "running",
"initial_jobs": ["job_abc", "job_def"]
}
Specific cause: 400 definition fails any validation rule, or root job creation fails. On root failure the pipeline is persisted with status = failed.
GET /api/pipelines
List the caller's pipelines (most recent first, capped at 50).
Response 200
[
{
"id": "f1a2…",
"name": "Dentists Paris",
"status": "running",
"created_at": "2026-05-27 10:14:02",
"completed_at": null,
"nodes_count": 3,
"done_count": 1,
"results_count": 187,
"progress_pct": 42
}
]
status is one of pending | running | done | failed | cancelled. nodes_count is derived from the stored definition. done_count is the number of stages already finished, results_count the rows aggregated across all stages so far, and progress_pct (0–100) a duration-weighted completion estimate — transform stages (filter, sort) count far less than scraping/enrichment stages, and the in-flight stage contributes its real sub-progress. The same progress_pct is also returned by GET /api/pipelines/{id}.
GET /api/pipelines/{id}
Return a single pipeline with its definition and the jobs spawned so far.
Response 200
{
"id": "f1a2…",
"user_id": 42,
"name": "Dentists Paris",
"definition": { "nodes": [...], "edges": [...] },
"status": "running",
"created_at": "2026-05-27 10:14:02",
"completed_at": null,
"progress_pct": 42,
"jobs": [
{
"id": "job_abc",
"job_type": "scrap",
"status": "done",
"pipeline_node_id": "n1",
"results_count": 187,
"error_message": null,
"created_at": "2026-05-27 10:14:02",
"completed_at": "2026-05-27 10:18:55"
}
],
"output_job": {
"id": "job_xyz",
"job_type": "verify_emails",
"results_count": 142,
"status": "done",
"download_available": true
}
}
output_job is the pipeline's final dataset — the output of the most-downstream stage that has actually produced rows (a pipeline filters/reduces, it does not sum; output_job.results_count therefore matches the headline count, not the sum of all stages). Download it via GET /api/jobs/{id}/download using output_job.id, in csv / xlsx / json. It is null while the pipeline has produced nothing downloadable yet, and download_available reflects whether a CSV (final or partial — so it works for running/stopped pipelines too) is still on disk and unexpired.
PATCH /api/pipelines/{id}
Not implemented. The current API does not expose graph mutation after creation; clone the pipeline by re-issuing POST /api/pipelines with an updated definition. Returns 405 Method Not Allowed.
DELETE /api/pipelines/{id}
Not implemented. Pipelines are immutable once created; deletion will be added once retention policy is defined. Returns 405 Method Not Allowed.
POST /api/pipelines/{id}/run
Not implemented. Pipelines start automatically when created via POST /api/pipelines; there is no separate run endpoint. To re-execute an existing graph, post it again as a new pipeline.
GET /api/pipelines/{id}/nodes/{node_id}/input-columns
Inspect the schema of the CSV that will feed a given node. Useful for building filter UIs.
Behaviour. The endpoint locates the node's most recent predecessor job. If the predecessor is not yet done, the response carries an empty columns list and a reason code. Otherwise the predecessor's output CSV is read (up to 5000 rows) and each column is profiled for type, fill rate, and sample values.
Response 200 — predecessor done
{
"columns": [
{
"name": "telephone",
"type": "phone",
"fill_rate": 0.92,
"sample_values": ["+33 1 23 45 67 89", "0612345678"],
"distinct_count": null
},
{
"name": "categorie",
"type": "category",
"fill_rate": 1.0,
"sample_values": ["dentiste", "orthodontiste"],
"distinct_count": 4,
"distinct_values": ["dentiste", "endodontiste", "orthodontiste", "stomatologue"]
}
],
"row_count": 187,
"predecessor_job_id": "job_abc"
}
type is one of phone | email | url | number | category | text. A column is tagged category only if it has between 1 and 200 distinct non-empty values; otherwise it falls back to text. A typed verdict requires ≥ 80% of non-empty values to match the corresponding pattern.
Response 200 — no usable input
{ "columns": [], "reason": "no_predecessor" }
reason |
Meaning |
|---|---|
no_predecessor |
The node is a root, or has no incoming edge yet. |
no_data_yet |
Predecessor job exists but is not in status done. |
no_csv_found |
Predecessor finished but no output CSV is on disk. |
csv_read_error |
The CSV file could not be parsed. |
POST /api/pipelines/{id}/nodes/{node_id}/filter-preview
Apply a set of filter rules in memory against the upstream CSV and return the match count plus a small sample. No job is created; no state is mutated.
The target node must be of type filter. The body uses the same rules shape that filter nodes persist in their config.rules; previews are computed by the same function the worker uses at execution time, so the count is authoritative for the data inspected.
Request body
{
"rules": {
"logic": "AND",
"conditions": [
{"column": "fill_rate", "op": ">=", "value": 0.5},
{"column": "categorie", "op": "in", "value": ["dentiste", "orthodontiste"]}
]
}
}
The exact rule grammar is defined by the filter module (see Filter module).
Response 200
{
"total": 187,
"matched": 73,
"samples": [
{"nom": "Cabinet Dupont", "telephone": "0123456789", "categorie": "dentiste"}
],
"predecessor_job_id": "job_abc",
"fieldnames": ["nom", "telephone", "categorie", "site_web"],
"capped": false
}
samples contains up to 5 matched rows with empty fields stripped. capped is true when the upstream CSV exceeded the 5000-row preview limit — in that case total reflects only the inspected window, but the matched/total ratio remains representative.
When the predecessor is not ready, the response is the same {total, matched, samples, reason} skeleton with all counts at 0. Possible reason codes mirror the input-columns endpoint: no_predecessor, no_data_yet, no_csv_found.
Specific causes: 400 target node is not of type filter, or rule application raised; 500 CSV could not be read.
Lifecycle summary
POST /api/pipelinesvalidates the graph, persists the pipeline asrunning, and spawns one job per root node.- As each job reaches
done, the worker reads its CSV, transforms rows for the successor's input type, and creates the next job. Empty outputs short-circuit the branch. - When every spawned job has reached a terminal status (
done,failed,cancelled,expired), the pipeline is finalized asdoneif all succeeded, otherwisefailed.