#!/usr/bin/env python3 """Baserow seed script — cree les 9 tables formation-hub via API. Usage : BASEROW_URL=http://localhost:8080 \ BASEROW_EMAIL=admin@acadenice.fr \ BASEROW_PASSWORD=... \ python baserow/seed/seed.py [--schema baserow/seed/schema.json] Process : 1. Login (recupere JWT) 2. Create or reuse workspace 3. Create or reuse database 4. Create tables (1 pass) 5. Create primitive fields per table 6. Create link_row fields (2nd pass — apres que les tables existent) Idempotent : skip ce qui existe deja. """ import argparse import json import os import sys from pathlib import Path from typing import Any import requests class BaserowSeed: def __init__(self, base_url: str, email: str, password: str) -> None: self.base_url = base_url.rstrip("/") self.email = email self.password = password self.token: str | None = None self.session = requests.Session() def _headers(self) -> dict[str, str]: if self.token is None: raise RuntimeError("Not logged in") return {"Authorization": f"JWT {self.token}", "Content-Type": "application/json"} def login(self) -> None: url = f"{self.base_url}/api/user/token-auth/" r = self.session.post(url, json={"email": self.email, "password": self.password}) r.raise_for_status() self.token = r.json()["token"] print(f" [auth] Logged in as {self.email}") def get_or_create_workspace(self, name: str) -> int: r = self.session.get(f"{self.base_url}/api/workspaces/", headers=self._headers()) r.raise_for_status() for w in r.json(): if w["name"] == name: print(f" [workspace] Reuse '{name}' id={w['id']}") return w["id"] r = self.session.post( f"{self.base_url}/api/workspaces/", headers=self._headers(), json={"name": name} ) r.raise_for_status() wid = r.json()["id"] print(f" [workspace] Created '{name}' id={wid}") return wid def get_or_create_database(self, workspace_id: int, name: str) -> int: r = self.session.get( f"{self.base_url}/api/applications/workspace/{workspace_id}/", headers=self._headers() ) r.raise_for_status() for app in r.json(): if app["name"] == name and app["type"] == "database": print(f" [db] Reuse '{name}' id={app['id']}") return app["id"] r = self.session.post( f"{self.base_url}/api/applications/workspace/{workspace_id}/", headers=self._headers(), json={"name": name, "type": "database"}, ) r.raise_for_status() did = r.json()["id"] print(f" [db] Created '{name}' id={did}") return did def list_tables(self, database_id: int) -> dict[str, int]: r = self.session.get( f"{self.base_url}/api/database/tables/database/{database_id}/", headers=self._headers() ) r.raise_for_status() return {t["name"]: t["id"] for t in r.json()} def create_table_with_minimal(self, database_id: int, name: str) -> int: r = self.session.post( f"{self.base_url}/api/database/tables/database/{database_id}/", headers=self._headers(), json={"name": name}, ) r.raise_for_status() tid = r.json()["id"] print(f" [table] Created '{name}' id={tid}") return tid def list_fields(self, table_id: int) -> list[dict[str, Any]]: r = self.session.get( f"{self.base_url}/api/database/fields/table/{table_id}/", headers=self._headers() ) r.raise_for_status() return r.json() def create_field(self, table_id: int, field_def: dict[str, Any]) -> int: payload = self._field_to_payload(field_def) r = self.session.post( f"{self.base_url}/api/database/fields/table/{table_id}/", headers=self._headers(), json=payload, ) if r.status_code >= 300: print(f" [ERROR] field {field_def['name']} payload={payload} resp={r.text}") r.raise_for_status() return r.json()["id"] def update_field(self, field_id: int, field_def: dict[str, Any]) -> None: payload = self._field_to_payload(field_def) r = self.session.patch( f"{self.base_url}/api/database/fields/{field_id}/", headers=self._headers(), json=payload, ) r.raise_for_status() def delete_field(self, field_id: int) -> None: r = self.session.delete( f"{self.base_url}/api/database/fields/{field_id}/", headers=self._headers() ) r.raise_for_status() @staticmethod def _field_to_payload(field_def: dict[str, Any]) -> dict[str, Any]: ftype = field_def["type"] payload: dict[str, Any] = {"name": field_def["name"], "type": ftype} if ftype == "number": payload["number_decimal_places"] = field_def.get("number_decimal_places", 0) payload["number_negative"] = True elif ftype == "single_select" or ftype == "multiple_select": payload["select_options"] = field_def.get("select_options", []) elif ftype == "date": payload["date_format"] = "ISO" elif ftype == "long_text": payload["long_text_enable_rich_text"] = True return payload def create_link_field(self, from_table_id: int, name: str, to_table_id: int) -> dict[str, Any]: r = self.session.post( f"{self.base_url}/api/database/fields/table/{from_table_id}/", headers=self._headers(), json={"name": name, "type": "link_row", "link_row_table_id": to_table_id}, ) if r.status_code >= 300: print(f" [ERROR] link {name} resp={r.text}") r.raise_for_status() return r.json() def rename_field(self, field_id: int, new_name: str) -> None: r = self.session.patch( f"{self.base_url}/api/database/fields/{field_id}/", headers=self._headers(), json={"name": new_name}, ) r.raise_for_status() def create_formula_field(self, table_id: int, name: str, expression: str) -> int: r = self.session.post( f"{self.base_url}/api/database/fields/table/{table_id}/", headers=self._headers(), json={"name": name, "type": "formula", "formula": expression}, ) if r.status_code >= 300: print(f" [ERROR] formula {name} expr={expression} resp={r.text}") r.raise_for_status() return r.json()["id"] def seed(self, schema: dict[str, Any]) -> None: print(f"\n[1/5] Login {self.base_url}") self.login() print(f"\n[2/5] Workspace '{schema['workspace_name']}'") ws_id = self.get_or_create_workspace(schema["workspace_name"]) print(f"\n[3/5] Database '{schema['database_name']}'") db_id = self.get_or_create_database(ws_id, schema["database_name"]) print(f"\n[4/5] Tables + primitive fields") existing = self.list_tables(db_id) table_ids: dict[str, int] = dict(existing) for table in schema["tables"]: tname = table["name"] if tname in existing: tid = existing[tname] print(f" [table] Reuse '{tname}' id={tid}") else: tid = self.create_table_with_minimal(db_id, tname) table_ids[tname] = tid self._sync_fields(tid, table) print(f"\n[5/6] Link fields (2nd pass)") for link in schema["links"]: from_id = table_ids[link["from_table"]] to_id = table_ids[link["to_table"]] existing_fields = {f["name"]: f for f in self.list_fields(from_id)} if link["from_field"] in existing_fields: print(f" [link] Reuse {link['from_table']}.{link['from_field']} -> {link['to_table']}") continue link_field = self.create_link_field(from_id, link["from_field"], to_id) print(f" [link] Created {link['from_table']}.{link['from_field']} -> {link['to_table']}") related_id = link_field.get("link_row_related_field_id") or link_field.get("link_row_related_field") related_name = link.get("related_field_name") if related_name and related_id: self.rename_field(related_id, related_name) print(f" [link] Renamed related field id={related_id} -> '{related_name}'") print(f"\n[6/6] Formula fields (3rd pass)") for f in schema.get("formulas", []): tname = f["table"] tid = table_ids[tname] existing_fields = {x["name"]: x for x in self.list_fields(tid)} if f["name"] in existing_fields: print(f" [formula] Reuse {tname}.{f['name']}") continue self.create_formula_field(tid, f["name"], f["expression"]) print(f" [formula] Created {tname}.{f['name']}") print("\n=== Seed OK ===") print(f" Workspace: {schema['workspace_name']}") print(f" Database : {schema['database_name']}") print(f" Tables : {len(table_ids)}") print(f" Links : {len(schema['links'])}") print(f" Formulas : {len(schema.get('formulas', []))}") def _sync_fields(self, table_id: int, table_def: dict[str, Any]) -> None: existing = {f["name"]: f for f in self.list_fields(table_id)} # Default Baserow table comes with one primary field "Name". Rename it to our primary if needed. primary_target = table_def["primary_field"] if primary_target not in existing: primary_existing = next((f for f in existing.values() if f.get("primary")), None) if primary_existing is not None: # Find the primary field def primary_def = next(f for f in table_def["fields"] if f.get("primary")) self.update_field(primary_existing["id"], primary_def) print(f" [field] Renamed primary -> '{primary_target}'") existing[primary_target] = {**primary_existing, "name": primary_target} if primary_existing["name"] != primary_target: del existing[primary_existing["name"]] for field_def in table_def["fields"]: fname = field_def["name"] if fname in existing: continue self.create_field(table_id, field_def) print(f" [field] Created '{fname}' ({field_def['type']})") def main() -> int: parser = argparse.ArgumentParser(description="Seed Baserow formation-hub") parser.add_argument("--schema", default=str(Path(__file__).parent / "schema.json")) args = parser.parse_args() base_url = os.environ.get("BASEROW_URL", "http://localhost:8080") email = os.environ.get("BASEROW_EMAIL") password = os.environ.get("BASEROW_PASSWORD") if not email or not password: print("ERROR: set BASEROW_EMAIL et BASEROW_PASSWORD env vars", file=sys.stderr) return 1 schema = json.loads(Path(args.schema).read_text()) seeder = BaserowSeed(base_url, email, password) try: seeder.seed(schema) except requests.HTTPError as e: print(f"\nHTTP error: {e}\nResponse: {e.response.text if e.response else ''}", file=sys.stderr) return 2 return 0 if __name__ == "__main__": sys.exit(main())