Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions
- schema.json : 17 formulas (rollups + heures_restantes) ajoutees + related_field_name explicite sur les 10 liens - seed.py : 3 nouveaux methodes (create_formula_field, rename_field, create_link_field returns dict) - seed.py : pass 5/6 renomme automatiquement les related fields apres link creation - seed.py : pass 6/6 cree les formulas (idempotent) - README.md : section formulas updated Iteration 2 du plan Fast-App couverte. Apres seed, les rollups (formation_heures_attribuees, personne_heures_restantes_total, etc.) sont automatiques.
285 lines
11 KiB
Python
285 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Baserow seed script — cree les 9 tables formation-hub via API.
|
|
|
|
Usage :
|
|
BASEROW_URL=http://localhost:8080 \
|
|
BASEROW_EMAIL=admin@acadenice.fr \
|
|
BASEROW_PASSWORD=... \
|
|
python baserow/seed/seed.py [--schema baserow/seed/schema.json]
|
|
|
|
Process :
|
|
1. Login (recupere JWT)
|
|
2. Create or reuse workspace
|
|
3. Create or reuse database
|
|
4. Create tables (1 pass)
|
|
5. Create primitive fields per table
|
|
6. Create link_row fields (2nd pass — apres que les tables existent)
|
|
|
|
Idempotent : skip ce qui existe deja.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
|
|
class BaserowSeed:
|
|
def __init__(self, base_url: str, email: str, password: str) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.email = email
|
|
self.password = password
|
|
self.token: str | None = None
|
|
self.session = requests.Session()
|
|
|
|
def _headers(self) -> dict[str, str]:
|
|
if self.token is None:
|
|
raise RuntimeError("Not logged in")
|
|
return {"Authorization": f"JWT {self.token}", "Content-Type": "application/json"}
|
|
|
|
def login(self) -> None:
|
|
url = f"{self.base_url}/api/user/token-auth/"
|
|
r = self.session.post(url, json={"email": self.email, "password": self.password})
|
|
r.raise_for_status()
|
|
self.token = r.json()["token"]
|
|
print(f" [auth] Logged in as {self.email}")
|
|
|
|
def get_or_create_workspace(self, name: str) -> int:
|
|
r = self.session.get(f"{self.base_url}/api/workspaces/", headers=self._headers())
|
|
r.raise_for_status()
|
|
for w in r.json():
|
|
if w["name"] == name:
|
|
print(f" [workspace] Reuse '{name}' id={w['id']}")
|
|
return w["id"]
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/workspaces/", headers=self._headers(), json={"name": name}
|
|
)
|
|
r.raise_for_status()
|
|
wid = r.json()["id"]
|
|
print(f" [workspace] Created '{name}' id={wid}")
|
|
return wid
|
|
|
|
def get_or_create_database(self, workspace_id: int, name: str) -> int:
|
|
r = self.session.get(
|
|
f"{self.base_url}/api/applications/workspace/{workspace_id}/", headers=self._headers()
|
|
)
|
|
r.raise_for_status()
|
|
for app in r.json():
|
|
if app["name"] == name and app["type"] == "database":
|
|
print(f" [db] Reuse '{name}' id={app['id']}")
|
|
return app["id"]
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/applications/workspace/{workspace_id}/",
|
|
headers=self._headers(),
|
|
json={"name": name, "type": "database"},
|
|
)
|
|
r.raise_for_status()
|
|
did = r.json()["id"]
|
|
print(f" [db] Created '{name}' id={did}")
|
|
return did
|
|
|
|
def list_tables(self, database_id: int) -> dict[str, int]:
|
|
r = self.session.get(
|
|
f"{self.base_url}/api/database/tables/database/{database_id}/", headers=self._headers()
|
|
)
|
|
r.raise_for_status()
|
|
return {t["name"]: t["id"] for t in r.json()}
|
|
|
|
def create_table_with_minimal(self, database_id: int, name: str) -> int:
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/database/tables/database/{database_id}/",
|
|
headers=self._headers(),
|
|
json={"name": name},
|
|
)
|
|
r.raise_for_status()
|
|
tid = r.json()["id"]
|
|
print(f" [table] Created '{name}' id={tid}")
|
|
return tid
|
|
|
|
def list_fields(self, table_id: int) -> list[dict[str, Any]]:
|
|
r = self.session.get(
|
|
f"{self.base_url}/api/database/fields/table/{table_id}/", headers=self._headers()
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def create_field(self, table_id: int, field_def: dict[str, Any]) -> int:
|
|
payload = self._field_to_payload(field_def)
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/database/fields/table/{table_id}/",
|
|
headers=self._headers(),
|
|
json=payload,
|
|
)
|
|
if r.status_code >= 300:
|
|
print(f" [ERROR] field {field_def['name']} payload={payload} resp={r.text}")
|
|
r.raise_for_status()
|
|
return r.json()["id"]
|
|
|
|
def update_field(self, field_id: int, field_def: dict[str, Any]) -> None:
|
|
payload = self._field_to_payload(field_def)
|
|
r = self.session.patch(
|
|
f"{self.base_url}/api/database/fields/{field_id}/",
|
|
headers=self._headers(),
|
|
json=payload,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
def delete_field(self, field_id: int) -> None:
|
|
r = self.session.delete(
|
|
f"{self.base_url}/api/database/fields/{field_id}/", headers=self._headers()
|
|
)
|
|
r.raise_for_status()
|
|
|
|
@staticmethod
|
|
def _field_to_payload(field_def: dict[str, Any]) -> dict[str, Any]:
|
|
ftype = field_def["type"]
|
|
payload: dict[str, Any] = {"name": field_def["name"], "type": ftype}
|
|
if ftype == "number":
|
|
payload["number_decimal_places"] = field_def.get("number_decimal_places", 0)
|
|
payload["number_negative"] = True
|
|
elif ftype == "single_select" or ftype == "multiple_select":
|
|
payload["select_options"] = field_def.get("select_options", [])
|
|
elif ftype == "date":
|
|
payload["date_format"] = "ISO"
|
|
elif ftype == "long_text":
|
|
payload["long_text_enable_rich_text"] = True
|
|
return payload
|
|
|
|
def create_link_field(self, from_table_id: int, name: str, to_table_id: int) -> dict[str, Any]:
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/database/fields/table/{from_table_id}/",
|
|
headers=self._headers(),
|
|
json={"name": name, "type": "link_row", "link_row_table_id": to_table_id},
|
|
)
|
|
if r.status_code >= 300:
|
|
print(f" [ERROR] link {name} resp={r.text}")
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def rename_field(self, field_id: int, new_name: str) -> None:
|
|
r = self.session.patch(
|
|
f"{self.base_url}/api/database/fields/{field_id}/",
|
|
headers=self._headers(),
|
|
json={"name": new_name},
|
|
)
|
|
r.raise_for_status()
|
|
|
|
def create_formula_field(self, table_id: int, name: str, expression: str) -> int:
|
|
r = self.session.post(
|
|
f"{self.base_url}/api/database/fields/table/{table_id}/",
|
|
headers=self._headers(),
|
|
json={"name": name, "type": "formula", "formula": expression},
|
|
)
|
|
if r.status_code >= 300:
|
|
print(f" [ERROR] formula {name} expr={expression} resp={r.text}")
|
|
r.raise_for_status()
|
|
return r.json()["id"]
|
|
|
|
def seed(self, schema: dict[str, Any]) -> None:
|
|
print(f"\n[1/5] Login {self.base_url}")
|
|
self.login()
|
|
|
|
print(f"\n[2/5] Workspace '{schema['workspace_name']}'")
|
|
ws_id = self.get_or_create_workspace(schema["workspace_name"])
|
|
|
|
print(f"\n[3/5] Database '{schema['database_name']}'")
|
|
db_id = self.get_or_create_database(ws_id, schema["database_name"])
|
|
|
|
print(f"\n[4/5] Tables + primitive fields")
|
|
existing = self.list_tables(db_id)
|
|
table_ids: dict[str, int] = dict(existing)
|
|
for table in schema["tables"]:
|
|
tname = table["name"]
|
|
if tname in existing:
|
|
tid = existing[tname]
|
|
print(f" [table] Reuse '{tname}' id={tid}")
|
|
else:
|
|
tid = self.create_table_with_minimal(db_id, tname)
|
|
table_ids[tname] = tid
|
|
self._sync_fields(tid, table)
|
|
|
|
print(f"\n[5/6] Link fields (2nd pass)")
|
|
for link in schema["links"]:
|
|
from_id = table_ids[link["from_table"]]
|
|
to_id = table_ids[link["to_table"]]
|
|
existing_fields = {f["name"]: f for f in self.list_fields(from_id)}
|
|
if link["from_field"] in existing_fields:
|
|
print(f" [link] Reuse {link['from_table']}.{link['from_field']} -> {link['to_table']}")
|
|
continue
|
|
link_field = self.create_link_field(from_id, link["from_field"], to_id)
|
|
print(f" [link] Created {link['from_table']}.{link['from_field']} -> {link['to_table']}")
|
|
related_id = link_field.get("link_row_related_field_id") or link_field.get("link_row_related_field")
|
|
related_name = link.get("related_field_name")
|
|
if related_name and related_id:
|
|
self.rename_field(related_id, related_name)
|
|
print(f" [link] Renamed related field id={related_id} -> '{related_name}'")
|
|
|
|
print(f"\n[6/6] Formula fields (3rd pass)")
|
|
for f in schema.get("formulas", []):
|
|
tname = f["table"]
|
|
tid = table_ids[tname]
|
|
existing_fields = {x["name"]: x for x in self.list_fields(tid)}
|
|
if f["name"] in existing_fields:
|
|
print(f" [formula] Reuse {tname}.{f['name']}")
|
|
continue
|
|
self.create_formula_field(tid, f["name"], f["expression"])
|
|
print(f" [formula] Created {tname}.{f['name']}")
|
|
|
|
print("\n=== Seed OK ===")
|
|
print(f" Workspace: {schema['workspace_name']}")
|
|
print(f" Database : {schema['database_name']}")
|
|
print(f" Tables : {len(table_ids)}")
|
|
print(f" Links : {len(schema['links'])}")
|
|
print(f" Formulas : {len(schema.get('formulas', []))}")
|
|
|
|
def _sync_fields(self, table_id: int, table_def: dict[str, Any]) -> None:
|
|
existing = {f["name"]: f for f in self.list_fields(table_id)}
|
|
# Default Baserow table comes with one primary field "Name". Rename it to our primary if needed.
|
|
primary_target = table_def["primary_field"]
|
|
if primary_target not in existing:
|
|
primary_existing = next((f for f in existing.values() if f.get("primary")), None)
|
|
if primary_existing is not None:
|
|
# Find the primary field def
|
|
primary_def = next(f for f in table_def["fields"] if f.get("primary"))
|
|
self.update_field(primary_existing["id"], primary_def)
|
|
print(f" [field] Renamed primary -> '{primary_target}'")
|
|
existing[primary_target] = {**primary_existing, "name": primary_target}
|
|
if primary_existing["name"] != primary_target:
|
|
del existing[primary_existing["name"]]
|
|
|
|
for field_def in table_def["fields"]:
|
|
fname = field_def["name"]
|
|
if fname in existing:
|
|
continue
|
|
self.create_field(table_id, field_def)
|
|
print(f" [field] Created '{fname}' ({field_def['type']})")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Seed Baserow formation-hub")
|
|
parser.add_argument("--schema", default=str(Path(__file__).parent / "schema.json"))
|
|
args = parser.parse_args()
|
|
|
|
base_url = os.environ.get("BASEROW_URL", "http://localhost:8080")
|
|
email = os.environ.get("BASEROW_EMAIL")
|
|
password = os.environ.get("BASEROW_PASSWORD")
|
|
if not email or not password:
|
|
print("ERROR: set BASEROW_EMAIL et BASEROW_PASSWORD env vars", file=sys.stderr)
|
|
return 1
|
|
|
|
schema = json.loads(Path(args.schema).read_text())
|
|
seeder = BaserowSeed(base_url, email, password)
|
|
try:
|
|
seeder.seed(schema)
|
|
except requests.HTTPError as e:
|
|
print(f"\nHTTP error: {e}\nResponse: {e.response.text if e.response else ''}", file=sys.stderr)
|
|
return 2
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|