FR
Copied
API

Pipelines API

Compose and run DAGs of scraping, enrichment, and transformation steps under /api/pipelines.

Pipelines API

A pipeline is a directed acyclic graph (DAG) of nodes that chains scraping, enrichment, and transformation steps. Submitting a pipeline starts the root jobs synchronously; downstream jobs are spawned as each predecessor reaches done.

All endpoints live under /api/pipelines and require an authenticated session. Mutating routes additionally require an active (non-suspended) account. Generic errors: 401 no session, 403 not owner / account suspended, 404 pipeline or node id unknown — endpoint-specific causes are listed inline.

See also: Pipeline orchestration and Filter module.

Graph shape

The definition document describes the DAG. Edges are explicit and reference node ids; they are not inferred from any per-node inputs field.

{
  "nodes": [
    {"id": "n1", "type": "scrap",  "config": {"queries": ["dentist"], "zones": ["Paris"]}, "x": 100, "y": 100},
    {"id": "n2", "type": "emails", "config": {"mode": "normal"}, "x": 320, "y": 100},
    {"id": "n3", "type": "verify", "config": {}, "x": 540, "y": 100}
  ],
  "edges": [
    {"id": "e1", "from": "n1", "to": "n2"},
    {"id": "e2", "from": "n2", "to": "n3"}
  ]
}

Node types

Type Role Accepts (input) Produces (output)
scrap Google Maps scrape (root) none pois
import CSV/Sheets import (root) none pois
reviews Fetch reviews for POIs pois_any reviews
emails Discover emails from websites pois_any pois_email
verify SMTP verify emails pois_email verified
socials Discover social profiles pois_any pois
dead_check Detect inactive POIs pois_any pois
techstack Detect website tech stack pois_any pois
ads_intelligence Detect ad campaigns pois_any pois
brand_assets Extract logo and brand assets pois_any pois
legal_ids Find SIRET/SIREN from website pois_any pois
legal_data Company data via api.gouv.fr pois_any pois
legal_mentions Parse legal mentions page pois_any pois
phones_extra Find extra phone numbers pois_any pois
pricing Extract pricing/tariffs pois_any pois
pagespeed Google PageSpeed scoring pois_any pois
phone_info Phone line type / carrier (cache) pois_any pois
filter Apply rule-based row filter any_pois passthrough
sort Reorder rows by a column any_pois passthrough

filter and sort preserve the upstream type; type compatibility is resolved by walking back to the nearest non-passthrough ancestor.

Column guarantee

No module ever drops a column. Every enrichment and check node outputs all the columns it received — in their original order — plus its own columns appended at the end. This holds across an entire chain: a scrap → legal_ids → legal_data → emails pipeline ends with the Google Maps columns (lien_google_maps, note, nb_avis, lat, lon, …), the identifiers, the legal profile, and the emails side by side. Custom columns from an import are passed through untouched as well. The only way a column disappears is an explicit transformation the pipeline asks for (a filter rule, a sort top_n cut applies to rows, never columns).

The full machine-readable contract for every node — category, input/output, the needs/produces columns, and each block's config_schema — is served live at GET /api/pipelines/schema. That endpoint is the single source of truth; this table is a summary.

Validation rules

The server rejects a definition with HTTP 400 if any of the following hold:

Rule Error message
Empty nodes list Pipeline vide
More than 20 nodes Trop de nodes (max 20)
Duplicate node id IDs de nodes en doublon
Unknown type Type de node inconnu : ...
Edge endpoint references missing node Edge référence un node inexistant
Self-loop (from == to) Edge vers soi-même interdit
Root type connected as a successor Le node '...' ne peut pas avoir de prédécesseur
Incompatible output to input Connexion X → Y incompatible
Node with more than one predecessor (MVP limit) Le node ... a plusieurs prédécesseurs
Missing required config field Node '...' : champ requis manquant « ... »
Wrong config field type / bad enum value Node '...', champ « ... » : ...

Roots must be one of scrap or import. Fan-out (one node feeding several successors) is allowed; fan-in is not.

Portable envelope

Pipelines export and import as a single self-describing JSON envelope. The same shape is produced by the editor's Export button and accepted by Import and AI generation:

{
  "schema_version": 1,
  "name": "Dentists Paris",
  "definition": { "nodes": [...], "edges": [...] },
  "meta": { "exported_from": "outsend.xyz", "kind": "pipeline" }
}

POST /api/pipelines and POST /api/pipelines/validate accept either the full envelope or a bare definition. Before validation the server normalizes the definition: it generates missing edge ids, auto-lays out nodes that have no x/y, applies config_schema defaults, coerces newline-separated strings into string[] fields, and strips config fields not in the schema. This means a minimal hand-written or AI-generated {nodes, edges} (no coordinates, no edge ids) is accepted as-is.


GET /api/pipelines/schema

Return the canonical, machine-readable pipeline schema — the single source of truth shared by the editor, import, AI generation, and the planned MCP create_pipeline tool. Public (no auth): it is format documentation, identical for every caller.

Response 200

{
  "schema_version": 1,
  "compat": { "pois_any": ["pois", "pois_email"], "pois_email": ["pois_email"], "any_pois": ["pois", "pois_email", "verified"] },
  "root_types": ["import", "scrap"],
  "nodes": {
    "scrap": {
      "category": "source", "is_root": true,
      "input": null, "output": "pois",
      "needs": [], "produces": ["nom", "site_web", "telephone", "..."],
      "config_schema": {
        "queries": {"type": "string[]", "required": true, "label": "..."},
        "zones":   {"type": "string[]", "required": true, "label": "..."}
      }
    },
    "emails": { "category": "enrich", "input": "pois_any", "output": "pois_email",
                "needs": ["site_web"], "produces": ["email", "email_personal"],
                "config_schema": {"mode": {"type": "enum", "enum": ["normal", "deep"], "default": "normal"}} }
  }
}

config_schema field types: string, string[], int, float, bool, enum (with enum list), object. required and default are optional per field.


POST /api/pipelines/validate

Normalize and validate a definition without creating or running anything. Used by Import (review before launch) and AI generation (check the JSON Claude produced). Requires a session.

Request body

{ "definition": { "nodes": [...], "edges": [...] }, "schema_version": 1 }

Response 200 — valid

{
  "ok": true,
  "definition": { "nodes": [...with ids, x/y, defaults...], "edges": [...] },
  "summary": { "n_nodes": 3, "n_edges": 2, "types": ["scrap", "emails", "verify"] }
}

Response 200 — invalid (note: still HTTP 200, with ok: false)

{ "ok": false, "error": "Connexion scrap → verify incompatible" }

The returned definition is the normalized form, ready to load into the editor or submit verbatim to POST /api/pipelines.


Generate a pipeline with any AI

You don't need to write the JSON by hand. Two ways:

1. Built-in (inside outsend). The editor's 🤖 Build with AI button sends the schema above plus your plain-language description to Claude using your own key (BYOK), then validates and lays out the result.

2. Bring-your-own assistant (copy/paste anywhere). Open any AI assistant — claude.ai, Claude Desktop, Cursor, ChatGPT — and:

  1. Paste the contract: either this page, or the whole docs bundle at /docs/llms-full.txt, or just the JSON of GET /api/pipelines/schema.
  2. Add your request, e.g. "Compose an outsend pipeline that finds dentists in Berlin, gets their emails, verifies deliverability, and keeps the top-rated. Return only the JSON envelope."
  3. The assistant returns a {schema_version, name, definition} envelope. Paste it into the editor's Import dialog (it is validated before anything runs), or POST it to /api/pipelines.

Because the editor, import, and this API all accept the same envelope and the server normalizes it (missing edge ids, coordinates, and config defaults are filled in), a hand-assembled {nodes, edges} works without any layout fields.


POST /api/pipelines

Create a pipeline and launch its root jobs.

Request body

{
  "name": "Dentists Paris",
  "definition": { "nodes": [...], "edges": [...] }
}

name is optional (≤ 120 chars, defaults to "Pipeline"). The definition is normalized (see Portable envelope) before validation, so a minimal {nodes, edges} works.

Response 201

{
  "id": "f1a2…-uuid",
  "status": "running",
  "initial_jobs": ["job_abc", "job_def"]
}

Specific cause: 400 definition fails any validation rule, or root job creation fails. On root failure the pipeline is persisted with status = failed.


GET /api/pipelines

List the caller's pipelines (most recent first, capped at 50).

Response 200

[
  {
    "id": "f1a2…",
    "name": "Dentists Paris",
    "status": "running",
    "created_at": "2026-05-27 10:14:02",
    "completed_at": null,
    "nodes_count": 3,
    "done_count": 1,
    "results_count": 187,
    "progress_pct": 42
  }
]

status is one of pending | running | done | failed | cancelled. nodes_count is derived from the stored definition. done_count is the number of stages already finished, results_count the rows aggregated across all stages so far, and progress_pct (0–100) a duration-weighted completion estimate — transform stages (filter, sort) count far less than scraping/enrichment stages, and the in-flight stage contributes its real sub-progress. The same progress_pct is also returned by GET /api/pipelines/{id}.


GET /api/pipelines/{id}

Return a single pipeline with its definition and the jobs spawned so far.

Response 200

{
  "id": "f1a2…",
  "user_id": 42,
  "name": "Dentists Paris",
  "definition": { "nodes": [...], "edges": [...] },
  "status": "running",
  "created_at": "2026-05-27 10:14:02",
  "completed_at": null,
  "progress_pct": 42,
  "jobs": [
    {
      "id": "job_abc",
      "job_type": "scrap",
      "status": "done",
      "pipeline_node_id": "n1",
      "results_count": 187,
      "error_message": null,
      "created_at": "2026-05-27 10:14:02",
      "completed_at": "2026-05-27 10:18:55"
    }
  ],
  "output_job": {
    "id": "job_xyz",
    "job_type": "verify_emails",
    "results_count": 142,
    "status": "done",
    "download_available": true
  }
}

output_job is the pipeline's final dataset — the output of the most-downstream stage that has actually produced rows (a pipeline filters/reduces, it does not sum; output_job.results_count therefore matches the headline count, not the sum of all stages). Download it via GET /api/jobs/{id}/download using output_job.id, in csv / xlsx / json. It is null while the pipeline has produced nothing downloadable yet, and download_available reflects whether a CSV (final or partial — so it works for running/stopped pipelines too) is still on disk and unexpired.


PATCH /api/pipelines/{id}

Not implemented. The current API does not expose graph mutation after creation; clone the pipeline by re-issuing POST /api/pipelines with an updated definition. Returns 405 Method Not Allowed.


DELETE /api/pipelines/{id}

Not implemented. Pipelines are immutable once created; deletion will be added once retention policy is defined. Returns 405 Method Not Allowed.


POST /api/pipelines/{id}/run

Not implemented. Pipelines start automatically when created via POST /api/pipelines; there is no separate run endpoint. To re-execute an existing graph, post it again as a new pipeline.


GET /api/pipelines/{id}/nodes/{node_id}/input-columns

Inspect the schema of the CSV that will feed a given node. Useful for building filter UIs.

Behaviour. The endpoint locates the node's most recent predecessor job. If the predecessor is not yet done, the response carries an empty columns list and a reason code. Otherwise the predecessor's output CSV is read (up to 5000 rows) and each column is profiled for type, fill rate, and sample values.

Response 200 — predecessor done

{
  "columns": [
    {
      "name": "telephone",
      "type": "phone",
      "fill_rate": 0.92,
      "sample_values": ["+33 1 23 45 67 89", "0612345678"],
      "distinct_count": null
    },
    {
      "name": "categorie",
      "type": "category",
      "fill_rate": 1.0,
      "sample_values": ["dentiste", "orthodontiste"],
      "distinct_count": 4,
      "distinct_values": ["dentiste", "endodontiste", "orthodontiste", "stomatologue"]
    }
  ],
  "row_count": 187,
  "predecessor_job_id": "job_abc"
}

type is one of phone | email | url | number | category | text. A column is tagged category only if it has between 1 and 200 distinct non-empty values; otherwise it falls back to text. A typed verdict requires ≥ 80% of non-empty values to match the corresponding pattern.

Response 200 — no usable input

{ "columns": [], "reason": "no_predecessor" }
reason Meaning
no_predecessor The node is a root, or has no incoming edge yet.
no_data_yet Predecessor job exists but is not in status done.
no_csv_found Predecessor finished but no output CSV is on disk.
csv_read_error The CSV file could not be parsed.

POST /api/pipelines/{id}/nodes/{node_id}/filter-preview

Apply a set of filter rules in memory against the upstream CSV and return the match count plus a small sample. No job is created; no state is mutated.

The target node must be of type filter. The body uses the same rules shape that filter nodes persist in their config.rules; previews are computed by the same function the worker uses at execution time, so the count is authoritative for the data inspected.

Request body

{
  "rules": {
    "logic": "AND",
    "conditions": [
      {"column": "fill_rate", "op": ">=", "value": 0.5},
      {"column": "categorie", "op": "in", "value": ["dentiste", "orthodontiste"]}
    ]
  }
}

The exact rule grammar is defined by the filter module (see Filter module).

Response 200

{
  "total": 187,
  "matched": 73,
  "samples": [
    {"nom": "Cabinet Dupont", "telephone": "0123456789", "categorie": "dentiste"}
  ],
  "predecessor_job_id": "job_abc",
  "fieldnames": ["nom", "telephone", "categorie", "site_web"],
  "capped": false
}

samples contains up to 5 matched rows with empty fields stripped. capped is true when the upstream CSV exceeded the 5000-row preview limit — in that case total reflects only the inspected window, but the matched/total ratio remains representative.

When the predecessor is not ready, the response is the same {total, matched, samples, reason} skeleton with all counts at 0. Possible reason codes mirror the input-columns endpoint: no_predecessor, no_data_yet, no_csv_found.

Specific causes: 400 target node is not of type filter, or rule application raised; 500 CSV could not be read.


Lifecycle summary

  1. POST /api/pipelines validates the graph, persists the pipeline as running, and spawns one job per root node.
  2. As each job reaches done, the worker reads its CSV, transforms rows for the successor's input type, and creates the next job. Empty outputs short-circuit the branch.
  3. When every spawned job has reached a terminal status (done, failed, cancelled, expired), the pipeline is finalized as done if all succeeded, otherwise failed.