EN
Copied
API

API Pipelines

Composer et exécuter des DAG d'étapes de scraping, enrichissement et transformation sous /api/pipelines.

API Pipelines

Un pipeline est un graphe orienté acyclique (DAG) de nodes qui enchaîne des étapes de scraping, enrichissement et transformation. Soumettre un pipeline lance les jobs racines de façon synchrone ; les jobs en aval sont engendrés à mesure que chaque prédécesseur atteint done.

Tous les endpoints sont sous /api/pipelines et requièrent une session authentifiée. Les routes mutantes requièrent en plus un compte actif (non suspendu). Erreurs génériques : 401 pas de session, 403 non propriétaire / compte suspendu, 404 pipeline ou node inconnu — les causes spécifiques sont listées en ligne.

Voir aussi : Orchestration de pipelines et Module filter.

Forme du graphe

Le document definition décrit le DAG. Les edges sont explicites et référencent des ids de node ; ils ne sont pas inférés d'un champ inputs par node.

{
  "nodes": [
    {"id": "n1", "type": "scrap",  "config": {"queries": ["dentist"], "zones": ["Paris"]}, "x": 100, "y": 100},
    {"id": "n2", "type": "emails", "config": {"mode": "normal"}, "x": 320, "y": 100},
    {"id": "n3", "type": "verify", "config": {}, "x": 540, "y": 100}
  ],
  "edges": [
    {"id": "e1", "from": "n1", "to": "n2"},
    {"id": "e2", "from": "n2", "to": "n3"}
  ]
}

Types de node

Type Rôle Accepte (entrée) Produit (sortie)
scrap Scrape Google Maps (racine) aucune pois
import Import CSV/Sheets (racine) aucune pois
reviews Récupérer les avis des POIs pois_any reviews
emails Découvrir les emails depuis les sites pois_any pois_email
verify Vérification SMTP des emails pois_email verified
socials Découvrir les profils sociaux pois_any pois
dead_check Détecter les POIs inactifs pois_any pois
techstack Détecter la pile tech du site pois_any pois
ads_intelligence Détecter les campagnes publicitaires pois_any pois
brand_assets Extraire logo et assets de marque pois_any pois
filter Appliquer un filtre à base de règles any_pois passthrough
sort Réordonner les lignes par colonne any_pois passthrough

filter et sort préservent le type amont ; la compatibilité de types est résolue en remontant jusqu'au premier ancêtre non-passthrough.

Règles de validation

Le serveur rejette une définition avec HTTP 400 si l'une des conditions suivantes est vraie :

Règle Message d'erreur
Liste nodes vide Pipeline vide
Plus de 20 nodes Trop de nodes (max 20)
Id de node en doublon IDs de nodes en doublon
type inconnu Type de node inconnu : ...
Extrémité d'edge référence un node manquant Edge référence un node inexistant
Boucle (from == to) Edge vers soi-même interdit
Type racine branché comme successeur Le node '...' ne peut pas avoir de prédécesseur
Sortie incompatible avec entrée Connexion X → Y incompatible
Node avec plusieurs prédécesseurs (limite MVP) Le node ... a plusieurs prédécesseurs

Les racines doivent être scrap ou import. Le fan-out (un node alimentant plusieurs successeurs) est autorisé ; le fan-in ne l'est pas.


POST /api/pipelines

Crée un pipeline et lance ses jobs racines.

Corps de requête

{
  "name": "Dentistes Paris",
  "definition": { "nodes": [...], "edges": [...] }
}

name est optionnel (≤ 120 caractères, défaut "Pipeline").

Réponse 201

{
  "id": "f1a2…-uuid",
  "status": "running",
  "initial_jobs": ["job_abc", "job_def"]
}

Cause spécifique : 400 la définition échoue à une règle de validation, ou la création du job racine échoue. En cas d'échec racine, le pipeline est persisté avec status = failed.


GET /api/pipelines

Liste les pipelines de l'appelant (plus récents d'abord, plafonné à 50).

Réponse 200

[
  {
    "id": "f1a2…",
    "name": "Dentistes Paris",
    "status": "running",
    "created_at": "2026-05-27 10:14:02",
    "completed_at": null,
    "nodes_count": 3
  }
]

status prend l'une des valeurs pending | running | done | failed | cancelled. nodes_count est dérivé de la définition stockée.


GET /api/pipelines/{id}

Retourne un pipeline avec sa définition et les jobs engendrés jusqu'ici.

Réponse 200

{
  "id": "f1a2…",
  "user_id": 42,
  "name": "Dentistes Paris",
  "definition": { "nodes": [...], "edges": [...] },
  "status": "running",
  "created_at": "2026-05-27 10:14:02",
  "completed_at": null,
  "progress_pct": 42,
  "jobs": [
    {
      "id": "job_abc",
      "job_type": "scrap",
      "status": "done",
      "pipeline_node_id": "n1",
      "results_count": 187,
      "error_message": null,
      "created_at": "2026-05-27 10:14:02",
      "completed_at": "2026-05-27 10:18:55"
    }
  ],
  "output_job": {
    "id": "job_xyz",
    "job_type": "verify_emails",
    "results_count": 142,
    "status": "done",
    "download_available": true
  }
}

output_job est le jeu de données final du pipeline — la sortie de l'étape la plus en aval ayant réellement produit des lignes (un pipeline filtre/réduit, il n'additionne pas ; output_job.results_count correspond donc au compteur affiché, pas à la somme des étapes). On le télécharge via GET /api/jobs/{id}/download avec output_job.id, en csv / xlsx / json. Il vaut null tant que le pipeline n'a rien produit de téléchargeable, et download_available indique si un CSV (final ou partiel — donc valable aussi pour un pipeline en cours/arrêté) est encore présent sur disque et non expiré.


PATCH /api/pipelines/{id}

Non implémenté. L'API actuelle n'expose pas de mutation du graphe après création ; cloner le pipeline en relançant POST /api/pipelines avec une définition mise à jour. Retourne 405 Method Not Allowed.


DELETE /api/pipelines/{id}

Non implémenté. Les pipelines sont immuables une fois créés ; la suppression sera ajoutée lorsque la politique de rétention sera définie. Retourne 405 Method Not Allowed.


POST /api/pipelines/{id}/cancel

Arrête le pipeline entier : l'étape en cours est stoppée et les étapes restantes ne sont pas lancées. Les résultats partiels déjà extraits restent téléchargeables sur chaque job concerné. À distinguer de l'arrêt d'une seule étape (POST /api/jobs/{id}/cancel), qui met simplement la pipeline en pause sur cette étape.

Le pipeline passe en statut cancelled. Renvoie 400 si le pipeline est déjà terminal (done, failed, cancelled).

Réponse200 OK, {"ok": true}.


POST /api/pipelines/{id}/nodes/{node_id}/continue

« Continuer avec les résultats ». Quand une étape de scraping a été arrêtée (assez de résultats) ou a crashé, la pipeline se met en pause sur cette étape (les suivantes ne démarrent pas automatiquement). Cet endpoint relance la chaîne à partir de cette étape en consommant ses résultats partiels : il crée le(s) job(s) de l'étape suivante.

Conditions : le dernier job du node est cancelled ou failed, et la pipeline n'est pas terminale. Si l'étape n'a produit aucune ligne, la branche est court-circuitée (pas de successeur créé). Renvoie 400 si l'étape n'est pas dans un état poursuivable.

Réponse200 OK, {"ok": true}.


POST /api/pipelines/{id}/run

Non implémenté. Les pipelines démarrent automatiquement à la création via POST /api/pipelines ; aucun endpoint de run séparé. Pour ré-exécuter un graphe, le re-poster comme nouveau pipeline.


GET /api/pipelines/{id}/nodes/{node_id}/input-columns

Inspecte le schéma du CSV qui alimentera un node donné. Utile pour construire des UIs de filtre.

Comportement. L'endpoint localise le job prédécesseur le plus récent du node. Si le prédécesseur n'est pas encore done, la réponse porte une liste columns vide et un code reason. Sinon le CSV de sortie est lu (jusqu'à 5000 lignes) et chaque colonne est profilée pour type, taux de remplissage et exemples.

Réponse 200 — prédécesseur terminé

{
  "columns": [
    {
      "name": "telephone",
      "type": "phone",
      "fill_rate": 0.92,
      "sample_values": ["+33 1 23 45 67 89", "0612345678"],
      "distinct_count": null
    },
    {
      "name": "categorie",
      "type": "category",
      "fill_rate": 1.0,
      "sample_values": ["dentiste", "orthodontiste"],
      "distinct_count": 4,
      "distinct_values": ["dentiste", "endodontiste", "orthodontiste", "stomatologue"]
    }
  ],
  "row_count": 187,
  "predecessor_job_id": "job_abc"
}

type prend l'une des valeurs phone | email | url | number | category | text. Une colonne est étiquetée category uniquement si elle a entre 1 et 200 valeurs non vides distinctes ; sinon elle retombe à text. Un verdict typé requiert ≥ 80% des valeurs non vides correspondant au pattern.

Réponse 200 — pas d'entrée utilisable

{ "columns": [], "reason": "no_predecessor" }
reason Signification
no_predecessor Le node est une racine, ou n'a pas encore d'edge entrant.
no_data_yet Le job prédécesseur existe mais n'est pas done.
no_csv_found Le prédécesseur a fini mais aucun CSV n'est sur disque.
csv_read_error Le fichier CSV n'a pas pu être parsé.

POST /api/pipelines/{id}/nodes/{node_id}/filter-preview

Applique un jeu de règles de filtre en mémoire contre le CSV amont et renvoie le nombre de correspondances plus un petit échantillon. Aucun job n'est créé ; aucun état n'est muté.

Le node cible doit être de type filter. Le corps utilise la même forme rules que les nodes filter persistent dans leur config.rules ; les previews sont calculés par la même fonction que le worker utilise à l'exécution, donc le compte fait autorité pour les données inspectées.

Corps de requête

{
  "rules": {
    "logic": "AND",
    "conditions": [
      {"column": "fill_rate", "op": ">=", "value": 0.5},
      {"column": "categorie", "op": "in", "value": ["dentiste", "orthodontiste"]}
    ]
  }
}

La grammaire exacte des règles est définie par le module filter (voir Module filter).

Réponse 200

{
  "total": 187,
  "matched": 73,
  "samples": [
    {"nom": "Cabinet Dupont", "telephone": "0123456789", "categorie": "dentiste"}
  ],
  "predecessor_job_id": "job_abc",
  "fieldnames": ["nom", "telephone", "categorie", "site_web"],
  "capped": false
}

samples contient jusqu'à 5 lignes correspondantes avec les champs vides retirés. capped vaut true quand le CSV amont a dépassé la limite preview de 5000 lignes — dans ce cas total ne reflète que la fenêtre inspectée, mais le ratio matched/total reste représentatif.

Quand le prédécesseur n'est pas prêt, la réponse est la même squelette {total, matched, samples, reason} avec tous les compteurs à 0. Les codes reason possibles miroitent l'endpoint input-columns : no_predecessor, no_data_yet, no_csv_found.

Causes spécifiques : 400 node cible n'est pas de type filter, ou application de règle a levé une erreur ; 500 CSV non lisible.


Résumé du cycle de vie

  1. POST /api/pipelines valide le graphe, persiste le pipeline en running, et engendre un job par node racine.
  2. Quand un job se termine normalement (done), le worker lit son CSV, transforme les lignes pour le type d'entrée du successeur et crée le job suivant. Une sortie vide court-circuite la branche.
  3. Quand un job est arrêté (cancelled) ou crashe (failed), la pipeline se met en pause sur cette étape : aucune suite automatique. L'utilisateur reprend explicitement via POST /api/pipelines/{id}/nodes/{node_id}/continue (« Continuer avec les résultats »), qui crée le job suivant à partir du CSV partiel.
  4. La pipeline est finalisée en done quand toutes les étapes atteignables sont terminales et résolues. Un arrêt global (POST /api/pipelines/{id}/cancel) la finalise en cancelled. Un job expired la finalise en failed. Une étape cancelled/failed non poursuivie ne finalise rien : la pipeline reste running (en pause) jusqu'à un « Continuer » ou un arrêt global.