Wiki/baserow/seed/seed.py
Corentin JOGUET a0266b886c
Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions
feat(baserow): add formulas pass + related field naming to seed
- schema.json : 17 formulas (rollups + heures_restantes) ajoutees + related_field_name explicite sur les 10 liens
- seed.py : 3 nouveaux methodes (create_formula_field, rename_field, create_link_field returns dict)
- seed.py : pass 5/6 renomme automatiquement les related fields apres link creation
- seed.py : pass 6/6 cree les formulas (idempotent)
- README.md : section formulas updated

Iteration 2 du plan Fast-App couverte. Apres seed, les rollups
(formation_heures_attribuees, personne_heures_restantes_total, etc.)
sont automatiques.
2026-05-07 18:15:21 +02:00

285 lines
11 KiB
Python

#!/usr/bin/env python3
"""Baserow seed script — cree les 9 tables formation-hub via API.
Usage :
BASEROW_URL=http://localhost:8080 \
BASEROW_EMAIL=admin@acadenice.fr \
BASEROW_PASSWORD=... \
python baserow/seed/seed.py [--schema baserow/seed/schema.json]
Process :
1. Login (recupere JWT)
2. Create or reuse workspace
3. Create or reuse database
4. Create tables (1 pass)
5. Create primitive fields per table
6. Create link_row fields (2nd pass — apres que les tables existent)
Idempotent : skip ce qui existe deja.
"""
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any
import requests
class BaserowSeed:
def __init__(self, base_url: str, email: str, password: str) -> None:
self.base_url = base_url.rstrip("/")
self.email = email
self.password = password
self.token: str | None = None
self.session = requests.Session()
def _headers(self) -> dict[str, str]:
if self.token is None:
raise RuntimeError("Not logged in")
return {"Authorization": f"JWT {self.token}", "Content-Type": "application/json"}
def login(self) -> None:
url = f"{self.base_url}/api/user/token-auth/"
r = self.session.post(url, json={"email": self.email, "password": self.password})
r.raise_for_status()
self.token = r.json()["token"]
print(f" [auth] Logged in as {self.email}")
def get_or_create_workspace(self, name: str) -> int:
r = self.session.get(f"{self.base_url}/api/workspaces/", headers=self._headers())
r.raise_for_status()
for w in r.json():
if w["name"] == name:
print(f" [workspace] Reuse '{name}' id={w['id']}")
return w["id"]
r = self.session.post(
f"{self.base_url}/api/workspaces/", headers=self._headers(), json={"name": name}
)
r.raise_for_status()
wid = r.json()["id"]
print(f" [workspace] Created '{name}' id={wid}")
return wid
def get_or_create_database(self, workspace_id: int, name: str) -> int:
r = self.session.get(
f"{self.base_url}/api/applications/workspace/{workspace_id}/", headers=self._headers()
)
r.raise_for_status()
for app in r.json():
if app["name"] == name and app["type"] == "database":
print(f" [db] Reuse '{name}' id={app['id']}")
return app["id"]
r = self.session.post(
f"{self.base_url}/api/applications/workspace/{workspace_id}/",
headers=self._headers(),
json={"name": name, "type": "database"},
)
r.raise_for_status()
did = r.json()["id"]
print(f" [db] Created '{name}' id={did}")
return did
def list_tables(self, database_id: int) -> dict[str, int]:
r = self.session.get(
f"{self.base_url}/api/database/tables/database/{database_id}/", headers=self._headers()
)
r.raise_for_status()
return {t["name"]: t["id"] for t in r.json()}
def create_table_with_minimal(self, database_id: int, name: str) -> int:
r = self.session.post(
f"{self.base_url}/api/database/tables/database/{database_id}/",
headers=self._headers(),
json={"name": name},
)
r.raise_for_status()
tid = r.json()["id"]
print(f" [table] Created '{name}' id={tid}")
return tid
def list_fields(self, table_id: int) -> list[dict[str, Any]]:
r = self.session.get(
f"{self.base_url}/api/database/fields/table/{table_id}/", headers=self._headers()
)
r.raise_for_status()
return r.json()
def create_field(self, table_id: int, field_def: dict[str, Any]) -> int:
payload = self._field_to_payload(field_def)
r = self.session.post(
f"{self.base_url}/api/database/fields/table/{table_id}/",
headers=self._headers(),
json=payload,
)
if r.status_code >= 300:
print(f" [ERROR] field {field_def['name']} payload={payload} resp={r.text}")
r.raise_for_status()
return r.json()["id"]
def update_field(self, field_id: int, field_def: dict[str, Any]) -> None:
payload = self._field_to_payload(field_def)
r = self.session.patch(
f"{self.base_url}/api/database/fields/{field_id}/",
headers=self._headers(),
json=payload,
)
r.raise_for_status()
def delete_field(self, field_id: int) -> None:
r = self.session.delete(
f"{self.base_url}/api/database/fields/{field_id}/", headers=self._headers()
)
r.raise_for_status()
@staticmethod
def _field_to_payload(field_def: dict[str, Any]) -> dict[str, Any]:
ftype = field_def["type"]
payload: dict[str, Any] = {"name": field_def["name"], "type": ftype}
if ftype == "number":
payload["number_decimal_places"] = field_def.get("number_decimal_places", 0)
payload["number_negative"] = True
elif ftype == "single_select" or ftype == "multiple_select":
payload["select_options"] = field_def.get("select_options", [])
elif ftype == "date":
payload["date_format"] = "ISO"
elif ftype == "long_text":
payload["long_text_enable_rich_text"] = True
return payload
def create_link_field(self, from_table_id: int, name: str, to_table_id: int) -> dict[str, Any]:
r = self.session.post(
f"{self.base_url}/api/database/fields/table/{from_table_id}/",
headers=self._headers(),
json={"name": name, "type": "link_row", "link_row_table_id": to_table_id},
)
if r.status_code >= 300:
print(f" [ERROR] link {name} resp={r.text}")
r.raise_for_status()
return r.json()
def rename_field(self, field_id: int, new_name: str) -> None:
r = self.session.patch(
f"{self.base_url}/api/database/fields/{field_id}/",
headers=self._headers(),
json={"name": new_name},
)
r.raise_for_status()
def create_formula_field(self, table_id: int, name: str, expression: str) -> int:
r = self.session.post(
f"{self.base_url}/api/database/fields/table/{table_id}/",
headers=self._headers(),
json={"name": name, "type": "formula", "formula": expression},
)
if r.status_code >= 300:
print(f" [ERROR] formula {name} expr={expression} resp={r.text}")
r.raise_for_status()
return r.json()["id"]
def seed(self, schema: dict[str, Any]) -> None:
print(f"\n[1/5] Login {self.base_url}")
self.login()
print(f"\n[2/5] Workspace '{schema['workspace_name']}'")
ws_id = self.get_or_create_workspace(schema["workspace_name"])
print(f"\n[3/5] Database '{schema['database_name']}'")
db_id = self.get_or_create_database(ws_id, schema["database_name"])
print(f"\n[4/5] Tables + primitive fields")
existing = self.list_tables(db_id)
table_ids: dict[str, int] = dict(existing)
for table in schema["tables"]:
tname = table["name"]
if tname in existing:
tid = existing[tname]
print(f" [table] Reuse '{tname}' id={tid}")
else:
tid = self.create_table_with_minimal(db_id, tname)
table_ids[tname] = tid
self._sync_fields(tid, table)
print(f"\n[5/6] Link fields (2nd pass)")
for link in schema["links"]:
from_id = table_ids[link["from_table"]]
to_id = table_ids[link["to_table"]]
existing_fields = {f["name"]: f for f in self.list_fields(from_id)}
if link["from_field"] in existing_fields:
print(f" [link] Reuse {link['from_table']}.{link['from_field']} -> {link['to_table']}")
continue
link_field = self.create_link_field(from_id, link["from_field"], to_id)
print(f" [link] Created {link['from_table']}.{link['from_field']} -> {link['to_table']}")
related_id = link_field.get("link_row_related_field_id") or link_field.get("link_row_related_field")
related_name = link.get("related_field_name")
if related_name and related_id:
self.rename_field(related_id, related_name)
print(f" [link] Renamed related field id={related_id} -> '{related_name}'")
print(f"\n[6/6] Formula fields (3rd pass)")
for f in schema.get("formulas", []):
tname = f["table"]
tid = table_ids[tname]
existing_fields = {x["name"]: x for x in self.list_fields(tid)}
if f["name"] in existing_fields:
print(f" [formula] Reuse {tname}.{f['name']}")
continue
self.create_formula_field(tid, f["name"], f["expression"])
print(f" [formula] Created {tname}.{f['name']}")
print("\n=== Seed OK ===")
print(f" Workspace: {schema['workspace_name']}")
print(f" Database : {schema['database_name']}")
print(f" Tables : {len(table_ids)}")
print(f" Links : {len(schema['links'])}")
print(f" Formulas : {len(schema.get('formulas', []))}")
def _sync_fields(self, table_id: int, table_def: dict[str, Any]) -> None:
existing = {f["name"]: f for f in self.list_fields(table_id)}
# Default Baserow table comes with one primary field "Name". Rename it to our primary if needed.
primary_target = table_def["primary_field"]
if primary_target not in existing:
primary_existing = next((f for f in existing.values() if f.get("primary")), None)
if primary_existing is not None:
# Find the primary field def
primary_def = next(f for f in table_def["fields"] if f.get("primary"))
self.update_field(primary_existing["id"], primary_def)
print(f" [field] Renamed primary -> '{primary_target}'")
existing[primary_target] = {**primary_existing, "name": primary_target}
if primary_existing["name"] != primary_target:
del existing[primary_existing["name"]]
for field_def in table_def["fields"]:
fname = field_def["name"]
if fname in existing:
continue
self.create_field(table_id, field_def)
print(f" [field] Created '{fname}' ({field_def['type']})")
def main() -> int:
parser = argparse.ArgumentParser(description="Seed Baserow formation-hub")
parser.add_argument("--schema", default=str(Path(__file__).parent / "schema.json"))
args = parser.parse_args()
base_url = os.environ.get("BASEROW_URL", "http://localhost:8080")
email = os.environ.get("BASEROW_EMAIL")
password = os.environ.get("BASEROW_PASSWORD")
if not email or not password:
print("ERROR: set BASEROW_EMAIL et BASEROW_PASSWORD env vars", file=sys.stderr)
return 1
schema = json.loads(Path(args.schema).read_text())
seeder = BaserowSeed(base_url, email, password)
try:
seeder.seed(schema)
except requests.HTTPError as e:
print(f"\nHTTP error: {e}\nResponse: {e.response.text if e.response else ''}", file=sys.stderr)
return 2
return 0
if __name__ == "__main__":
sys.exit(main())