#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ═══════════════════════════════════════════════════════════════════════════════ Migration PocketBase 0.7.5 — Éclatement des parcours RC et FAC en 3 collections ═══════════════════════════════════════════════════════════════════════════════ CONTEXTE -------- Le parcours TPPC a déjà été éclaté en 3 collections : tppc (principale) / tppcprojet / tppctarif Les parcours RC et FAC sont encore "à plat" : tout est dans une seule collection (`rc` = 87 champs, `fac` = 51 champs). Le nouveau code front/back attend désormais : RC : rc (principale) + projetRC + tarifRC (relations rc.projetRC / rc.tarifRC) FAC : fac (principale) + facprojet + factarif (relations fac.projet / fac.tarif) Ce script : 1. CRÉE les 4 collections manquantes : projetRC, tarifRC, facprojet, factarif (ligne _collections + table SQLite + index `_created_idx`, à l'identique de la façon dont PocketBase 0.7.5 les crée — modèle copié sur tppc*). 2. AJOUTE à `rc` et `fac` les colonnes/relations manquantes attendues par le code. 3. MIGRE toutes les données existantes (TOUS les parcours) en répartissant les champs vers la bonne collection. PRINCIPE DE FIABILITÉ : migration 100 % NON DESTRUCTIVE. - On AJOUTE des colonnes, on ne supprime ni ne renomme jamais une colonne existante. - On COPIE les données (les anciennes valeurs restent dans `rc`/`fac` en secours). - Les `id` de `rc`/`fac` ne changent pas → les relations `contrat.rc` / `contrat.fac` restent valides. - Tout s'exécute dans UNE transaction SQLite (atomique : tout ou rien). - Idempotent : relançable sans risque (les enregistrements déjà migrés sont ignorés). - Backup automatique du fichier data.db avant toute écriture. UTILISATION ----------- ⚠️ ARRÊTER PocketBase avant (accès exclusif au fichier), puis le redémarrer après. python3 migrate_split_rc_fac.py --db /chemin/vers/pb_data/data.db # migration python3 migrate_split_rc_fac.py --db .../data.db --dry-run # simulation python3 migrate_split_rc_fac.py --db .../data.db --verify-only # vérif seule python3 migrate_split_rc_fac.py --db .../data.db --yes # sans confirmation Aucune dépendance externe (uniquement la bibliothèque standard Python 3). """ import argparse import datetime import json import os import random import shutil import sqlite3 import sys ALPHABET = "abcdefghijklmnopqrstuvwxyz0123456789" # ───────────────────────────────────────────────────────────────────────────── # SPÉCIFICATION DES SCHÉMAS # ───────────────────────────────────────────────────────────────────────────── # Chaque champ = (nom, type) où type ∈ { # "bool", "number", "text", "json", # ("select", maxSelect, [valeurs]), # ("rel", "nomCollectionCible"), # } # Type SQLite de la colonne backing (identique à ce que crée PocketBase) : # bool→Boolean DEFAULT FALSE | number→REAL DEFAULT 0 | # text/select/rel→TEXT DEFAULT '' | json→JSON DEFAULT NULL # ── projetRC : niveau "projet" du RC ≈ ancien `rc` à plat (+ activités compl. JSON) PROJET_RC_FIELDS = [ ("assureAdditionnel", "json"), ("designationVehicule", "json"), ("grilleMultimodal", "json"), ("grilleTerrestre", "json"), ("grilleAerien", "json"), ("activitesVoiturier", "json"), ("activitesCommissionnaire", "json"), ("activitesDemenageur", "json"), ("activitesLogistique", "json"), # Activités (chip) + capital "Nous consulter" possible → text ("actVoiturier", "bool"), ("valueActVoiturier", "text"), ("actLoueur", "bool"), ("valueActLoueur", "text"), ("actMultimodal", "bool"), ("valueActMultimodal", "text"), ("actDouane", "bool"), ("valueActDouane", "text"), ("actDemPar", "bool"), ("valueActDemPar", "text"), ("actDemParDom", "bool"), ("valueActDemParDom", "text"), ("actDemParAdv", "bool"), ("valueActDemParAdv", "text"), ("actDemEntr", "bool"), ("valueActDemEntr", "text"), ("actDemInterne", "bool"), ("valueActDemInterne", "text"), ("actGardeMeuble", "bool"), ("valueActGardeMeuble", "text"), ("actEntDep", "bool"), ("valueActEntDep", "text"), ("actPrestaLog", "bool"), ("valueActPrestaLog", "text"), ("actLevageur", "bool"), ("valueActLevageur", "text"), ("actTransitaire", "bool"), ("valueActTransitaire", "text"), # Marchandises ("marOrdinaire", "bool"), ("marRoulant", "bool"), ("marEngins", "bool"), ("marRoulantDem", "bool"), ("marMobilerUsag", "bool"), ("marPerissable", "bool"), ("marAnimaux", "bool"), ("marCiterne", "bool"), ("marBeton", "bool"), ("marExceptionnels", "bool"), ("marVrac", "bool"), # Zones ("zone1", "bool"), ("zone2", "bool"), ("zone3", "bool"), ("zone4", "bool"), ("zone5", "bool"), ("zone6", "bool"), # Extensions RCC / RCE ("extRCCModifCalArrim", "bool"), ("extRCCFerroutage", "bool"), ("extRCCFraisRecons", "bool"), ("extRCCConfie", "bool"), ("typeExtConfies", "text"), ("extRCCTPPC", "bool"), ("extRCCRegie", "bool"), ("extRCCSansMontageDemontage", "bool"), ("autresRC", "bool"), ("extRCEBraDebra", "bool"), ("extRCEMontageDemontage", "bool"), # Temporalités ("tempo", "text"), ("dateEffet", "text"), ("dateEcheance", "text"), ("dateFin", "text"), ("pj", "bool"), ("programmeInternationale", "bool"), ("participationResultat", "bool"), # Cotisations (peuvent valoir "Nous consulter" → text) ("typeCot", "text"), ("ca", "text"), ("cotIrreductible", "text"), ("tauxRCCHT", "text"), ("tauxRCCTTC", "text"), ("tauxRCEHT", "text"), ("tauxRCETTC", "text"), ("tauxTotalHT", "text"), ("tauxTotalTTC", "text"), ("cotRCCHT", "text"), ("cotRCCTTC", "text"), ("cotRCEHT", "text"), ("cotRCETTC", "text"), ("cotPJHT", "text"), ("cotPJTTC", "text"), ("cotTotalHT", "text"), ("cotTotalTTC", "text"), ("cotFraisHT", "text"), ("cotFraisTTC", "text"), ] # ── tarifRC : niveau "tarif" du RC (chiffrage calculé). Aucune source ancienne. TARIF_RC_FIELDS = [ ("sinistre", "number"), ("pourcentageVoiturier", "number"), ("isSetVoiturier", "bool"), ("pourcentageCommissionnaire", "number"), ("isSetCommissionnaire", "bool"), ("pourcentageDemenageur", "number"), ("isSetDemenageur", "bool"), ("pourcentageLogistique", "number"), ("isSetLogistique", "bool"), ("pourcentageAutocariste", "number"), ("isSetAutocariste", "bool"), ("pourcentageAutres", "number"), ("isSetAutres", "bool"), ("primeRCC_250", "number"), ("primeRCE_250", "number"), ("primePJ_250", "number"), ("primeTotal_250", "number"), ("tauxRCC_250", "number"), ("tauxRCE_250", "number"), ("tauxGlobal_250", "number"), ("primeRCC_400", "number"), ("primeRCE_400", "number"), ("primePJ_400", "number"), ("primeTotal_400", "number"), ("tauxRCC_400", "number"), ("tauxRCE_400", "number"), ("tauxGlobal_400", "number"), ("primeRCC_2000", "number"), ("primeRCE_2000", "number"), ("primePJ_2000", "number"), ("primeTotal_2000", "number"), ("tauxRCC_2000", "number"), ("tauxRCE_2000", "number"), ("tauxGlobal_2000", "number"), ("franchiseChoisie", "text"), ("checkDomImmat", "bool"), ("capitalDomImmat", "number"), ("checkContConf", "bool"), ("capitalContConf", "number"), ("checkDiffInv", "bool"), ("capitalDiffInv", "number"), ("checkStationLavage", "bool"), ("checkGarageInterne", "bool"), ("checkCSE", "bool"), ("checkTPPC", "bool"), ("capitalTPPC", "number"), ("vehiculesTPPC", "number"), ("checkPJ", "bool"), ("tarifcommercial", "number"), ] # ── Colonnes à AJOUTER à la collection `rc` (principale). zone1-6 existent déjà. RC_MAIN_ADD = [ ("tarifRC", ("rel", "tarifRC")), ("projetRC", ("rel", "projetRC")), ("typeCotisation", "text"), ("chiffreAffaires", "number"), ("nombreVehicules", "number"), ("checkRCE", "bool"), ("checkVoiturier", "bool"), ("capitalVoiturier", "number"), ("checkCommissionnaire", "bool"), ("capitalCommissionnaire", "number"), ("checkDemenageur", "bool"), ("capitalDemenageur", "number"), ("checkLogistique", "bool"), ("capitalLogistique", "number"), ("checkAutocariste", "bool"), ("capitalAutocariste", "number"), ("checkAutres", "bool"), ("capitalAutres", "number"), ("actComplVoiturier", "json"), ("actComplCommissionnaire", "json"), ("actComplDemenageur", "json"), ("actComplLogistique", "json"), ("marchandisesVoiturier", "json"), ("marchandisesCommissionnaire", "json"), ("marchandisesDemenageur", "json"), ("marchandisesLogistique", "json"), ("marchandisesAutocariste", "json"), ("marchandisesAutres", "json"), ("commentaire", "text"), ] # ── facprojet : niveau "projet" du FAC ≈ champs "projet" de l'ancien `fac`. FAC_PROJET_FIELDS = [ ("assureAdditionnel", "json"), ("valeurAssureeBase", "json"), # multi-select → tableau JSON ("risqueTransport", "json"), # multi-select → tableau JSON ("garOpt", "json"), # multi-select → tableau JSON ("valeurAssuree", "text"), # single-select → string ("typeTPPC", "text"), ("tempo", "text"), ("typeContrat", "text"), ("dateEffet", "text"), ("dateEcheance", "text"), ("dateFin", "text"), ("lieuDepart", "text"), ("lieuArrivee", "text"), ("coassurance", "bool"), ("programmeInternational", "bool"), ("participationResultat", "bool"), # Capitaux/cotisations : text (le code compare franchiseTransport !== "" et rend brut) ("capitalMax", "text"), ("capitalColis", "text"), ("capitalExped", "text"), ("franchiseTransport", "text"), ("cotAnnuelleTTC", "text"), ("cotComptant", "text"), ] # ── factarif : niveau "tarif" du FAC (chiffrage calculé). Aucune source ancienne. FAC_TARIF_FIELDS = [ ("fluxAchats", "json"), ("fluxVentes", "json"), ("franchise350", "json"), ("franchise750", "json"), ("sansFranchise", "json"), ("asIf", "json"), ("fluxIntersites", "bool"), ("typePolice", "text"), ("typeRO", "text"), ("conditionnement", "text"), ("oldFranchise", "text"), ("sinistres", "text"), ("nbVehicTPPC", "text"), ("typeFlux", "text"), ("montantGarantir", "text"), ("engagementRG", "text"), ("selectedFranchise", "text"), ("typeMarExpo", "text"), ("commentaire", "text"), ("zone", "text"), ("transport", "text"), ] # Champs JSON dont la valeur est un OBJET (et non un tableau) → vide = "{}". # Tous les autres champs JSON sont des tableaux → vide = "[]". # (Le code de génération fait .includes()/.forEach()/.proposition sans garde : on # garantit donc toujours une valeur JSON valide, jamais NULL, pour éviter tout crash.) OBJECT_JSON_FIELDS = {"fluxAchats", "fluxVentes", "franchise350", "franchise750", "sansFranchise"} def json_empty(fname): return "{}" if fname in OBJECT_JSON_FIELDS else "[]" # ── Colonnes à AJOUTER à la collection `fac` (principale). FAC_MAIN_ADD = [ ("projet", ("rel", "facprojet")), ("tarif", ("rel", "factarif")), ("nbVehicExpo", "text"), ("actAssuree", "text"), ("typeRG", "text"), ("multimodal", "text"), ("rg", "text"), ("primeHT", "text"), ("primeMini", "text"), ("zones", "json"), ("tppc", "bool"), ] # Renommages FAC (ancien champ `fac` → nouveau champ `fac` principal). Copie, non destructif. FAC_MAIN_RENAME = { "actAssuree": "actAssure", "nbVehicExpo": "marExpo", "typeRG": "typeGarantieRG", "rg": "risqueGuerre", "primeHT": "cotAnnuelleHT", "primeMini": "cotIrred", } # Mapping "secours" RC principal ← ancien `rc` (d'après FIELD_MAPPING de rc-data-manager.js) RC_MAIN_FILL = { "chiffreAffaires": "ca", "typeCotisation": "typeCot", "checkRCE": "autresRC", "checkVoiturier": "actVoiturier", "capitalVoiturier": "valueActVoiturier", "checkCommissionnaire": "actMultimodal", "capitalCommissionnaire": "valueActMultimodal", "checkDemenageur": "actDemEntr", "capitalDemenageur": "valueActDemEntr", "checkLogistique": "actPrestaLog", "capitalLogistique": "valueActPrestaLog", } # ───────────────────────────────────────────────────────────────────────────── # Outils bas niveau # ───────────────────────────────────────────────────────────────────────────── def now_pb(): """Horodatage au format PocketBase : 'YYYY-MM-DD HH:MM:SS.mmm'.""" return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.") + \ f"{datetime.datetime.now().microsecond // 1000:03d}" def rand_id(n=15): return "".join(random.choices(ALPHABET, k=n)) def sqlite_coltype(ftype): if ftype == "bool": return "Boolean DEFAULT FALSE" if ftype == "number": return "REAL DEFAULT 0" if ftype == "json": return "JSON DEFAULT NULL" # text, select, rel return "TEXT DEFAULT ''" def pb_field_options(ftype, resolve_collection): """Bloc `options` au format PocketBase selon le type.""" if ftype == "bool": return {} if ftype == "number": return {"min": None, "max": None} if ftype == "json": return {} if ftype == "text": return {"min": None, "max": None, "pattern": ""} if isinstance(ftype, tuple) and ftype[0] == "select": return {"maxSelect": ftype[1], "values": ftype[2]} if isinstance(ftype, tuple) and ftype[0] == "rel": return {"maxSelect": 1, "collectionId": resolve_collection(ftype[1]), "cascadeDelete": False} raise ValueError(f"type inconnu: {ftype}") def base_ftype(ftype): """Type 'racine' pour la colonne SQLite.""" if isinstance(ftype, tuple): return ftype[0] # 'select' ou 'rel' → TEXT return ftype def build_schema_json(fields, resolve_collection): out = [] for name, ftype in fields: bt = base_ftype(ftype) pb_type = {"select": "select", "rel": "relation"}.get(bt, bt) out.append({ "system": False, "id": rand_id(8), "name": name, "type": pb_type, "required": False, "unique": False, "options": pb_field_options(ftype, resolve_collection), }) return out # ───────────────────────────────────────────────────────────────────────────── # Accès schéma DB # ───────────────────────────────────────────────────────────────────────────── def table_columns(cur, table): cur.execute(f"PRAGMA table_info(`{table}`)") return [r[1] for r in cur.fetchall()] def collection_id(cur, name): cur.execute("SELECT id FROM _collections WHERE name=?", (name,)) r = cur.fetchone() return r[0] if r else None def collection_exists(cur, name): return collection_id(cur, name) is not None # ───────────────────────────────────────────────────────────────────────────── # Création de collection (ligne _collections + table + index) # ───────────────────────────────────────────────────────────────────────────── def create_collection(cur, name, fields, resolve_collection, log): if collection_exists(cur, name): log(f" • collection '{name}' existe déjà → ignorée") return collection_id(cur, name) cid = rand_id(15) # garantir l'unicité de l'id de collection cur.execute("SELECT id FROM _collections") existing = {r[0] for r in cur.fetchall()} while cid in existing: cid = rand_id(15) schema = build_schema_json(fields, resolve_collection) ts = now_pb() cur.execute( "INSERT INTO _collections " "(id, name, system, listRule, viewRule, createRule, updateRule, deleteRule, schema, created, updated) " "VALUES (?,?,?,?,?,?,?,?,?,?,?)", (cid, name, 0, None, None, None, None, None, json.dumps(schema), ts, ts), ) cols_sql = ["`id` TEXT PRIMARY KEY", "`created` TEXT DEFAULT \"\" NOT NULL", "`updated` TEXT DEFAULT \"\" NOT NULL"] for fname, ftype in fields: cols_sql.append(f'"{fname}" {sqlite_coltype(base_ftype(ftype))}') cur.execute(f"CREATE TABLE `{name}` (" + ", ".join(cols_sql) + ")") cur.execute(f"CREATE INDEX `{name}_created_idx` ON `{name}` (`created`)") log(f" ✓ collection '{name}' créée (id={cid}, {len(fields)} champs)") return cid def add_columns(cur, table, add_fields, resolve_collection, log): """Ajoute des champs à une collection existante (schema JSON + ALTER TABLE).""" cid = collection_id(cur, table) cur.execute("SELECT schema FROM _collections WHERE id=?", (cid,)) schema = json.loads(cur.fetchone()[0]) existing_field_names = {f["name"] for f in schema} existing_cols = set(table_columns(cur, table)) added = [] for fname, ftype in add_fields: # 1) ligne de schéma if fname not in existing_field_names: bt = base_ftype(ftype) pb_type = {"select": "select", "rel": "relation"}.get(bt, bt) schema.append({ "system": False, "id": rand_id(8), "name": fname, "type": pb_type, "required": False, "unique": False, "options": pb_field_options(ftype, resolve_collection), }) # 2) colonne SQLite if fname not in existing_cols: cur.execute(f'ALTER TABLE `{table}` ADD COLUMN "{fname}" {sqlite_coltype(base_ftype(ftype))}') added.append(fname) cur.execute("UPDATE _collections SET schema=?, updated=? WHERE id=?", (json.dumps(schema), now_pb(), cid)) if added: log(f" ✓ '{table}' : {len(added)} colonnes ajoutées ({', '.join(added)})") else: log(f" • '{table}' : aucune colonne à ajouter (déjà à jour)") # ───────────────────────────────────────────────────────────────────────────── # Conversion de valeurs pour la copie # ───────────────────────────────────────────────────────────────────────────── def num_to_text(v): """REAL → chaîne propre ('1000.0' → '1000', '12.5' → '12.5', None → '').""" if v is None: return "" if isinstance(v, float): return str(int(v)) if v.is_integer() else repr(v) return str(v) def default_for(ftype): bt = base_ftype(ftype) if bt == "bool": return 0 if bt == "number": return 0 if bt == "json": return None return "" # text / select / rel def copy_value(fname, ftype, raw): """Valeur à écrire dans la collection cible en copiant `raw` (source) selon le type cible. - JSON : on garantit toujours une valeur valide ('[]' tableau, '{}' objet) — jamais NULL — car le code de génération fait .includes()/.forEach()/.proposition sans garde. - text : un REAL source est converti en chaîne propre ('1000.0' → '1000'). """ bt = base_ftype(ftype) if bt == "json": if raw is None or raw == "": return json_empty(fname) return raw if bt == "text": if isinstance(raw, float): return num_to_text(raw) return raw if raw is not None else "" # bool / number / select / rel return raw if raw is not None else default_for(ftype) # ───────────────────────────────────────────────────────────────────────────── # Migration des données # ───────────────────────────────────────────────────────────────────────────── def insert_record(cur, table, values: dict): """Insère un enregistrement (ajoute id/created/updated).""" rid = rand_id(15) cur.execute(f"SELECT 1 FROM `{table}` WHERE id=?", (rid,)) while cur.fetchone(): rid = rand_id(15) cur.execute(f"SELECT 1 FROM `{table}` WHERE id=?", (rid,)) ts = now_pb() cols = ["id", "created", "updated"] + list(values.keys()) vals = [rid, ts, ts] + list(values.values()) ph = ",".join("?" * len(cols)) quoted = ",".join(f'"{c}"' for c in cols) cur.execute(f'INSERT INTO `{table}` ({quoted}) VALUES ({ph})', vals) return rid def migrate_rc(cur, log, dry): rc_cols = set(table_columns(cur, "rc")) projet_field_types = dict(PROJET_RC_FIELDS) projet_names = [n for n, _ in PROJET_RC_FIELDS] cur.execute("SELECT * FROM rc") rows = cur.fetchall() colnames = [d[0] for d in cur.description] idx = {c: i for i, c in enumerate(colnames)} migrated = skipped = 0 for row in rows: rc_id = row[idx["id"]] # idempotence : déjà migré si projetRC renseigné if "projetRC" in idx and row[idx["projetRC"]]: skipped += 1 continue # 1) projetRC : copie des champs projet depuis rc (mêmes noms) projet_vals = {} for fname in projet_names: ftype = projet_field_types[fname] raw = row[idx[fname]] if fname in rc_cols else None projet_vals[fname] = copy_value(fname, ftype, raw) # 2) tarifRC : enregistrement par défaut (aucune source ancienne) tarif_vals = {n: default_for(t) for n, t in TARIF_RC_FIELDS} # 3) rc principal : relations + remplissage "secours" + JSON vides → [] rc_update = {} for fname, ftype in RC_MAIN_ADD: if base_ftype(ftype) == "json": rc_update[fname] = "[]" # jamais NULL (le front parse ces tableaux) for dest, src in RC_MAIN_FILL.items(): if src in rc_cols: rc_update[dest] = row[idx[src]] if dry: migrated += 1 continue id_projet = insert_record(cur, "projetRC", projet_vals) id_tarif = insert_record(cur, "tarifRC", tarif_vals) rc_update["projetRC"] = id_projet rc_update["tarifRC"] = id_tarif sets = ", ".join(f'"{k}"=?' for k in rc_update) cur.execute(f'UPDATE rc SET {sets}, updated=? WHERE id=?', list(rc_update.values()) + [now_pb(), rc_id]) migrated += 1 log(f" RC : {migrated} migrés, {skipped} déjà à jour (total {len(rows)})") return migrated, skipped def migrate_fac(cur, log, dry): fac_cols = set(table_columns(cur, "fac")) projet_field_types = dict(FAC_PROJET_FIELDS) projet_names = [n for n, _ in FAC_PROJET_FIELDS] tarif_field_types = dict(FAC_TARIF_FIELDS) cur.execute("SELECT * FROM fac") rows = cur.fetchall() colnames = [d[0] for d in cur.description] idx = {c: i for i, c in enumerate(colnames)} migrated = skipped = 0 for row in rows: fac_id = row[idx["id"]] if "projet" in idx and row[idx["projet"]]: skipped += 1 continue # 1) facprojet : champs projet (mêmes noms, conversion REAL→text si besoin) projet_vals = {} for fname in projet_names: ftype = projet_field_types[fname] raw = row[idx[fname]] if fname in fac_cols else None projet_vals[fname] = copy_value(fname, ftype, raw) # 2) factarif : défauts (JSON tableaux→[], objets→{} pour éviter tout crash) tarif_vals = {n: (json_empty(n) if base_ftype(t) == "json" else default_for(t)) for n, t in FAC_TARIF_FIELDS} # défaut bénin : évite que la génération du doc tarif (qui fait # getSelectedFranchise(selectedFranchise).proposition sans garde) ne plante # pour un ancien enregistrement dépourvu de chiffrage. Rendu vide mais valide. tarif_vals["selectedFranchise"] = "sansFranchise" # 3) fac principal : renommages + zones + relations + dérivation tppc fac_update = {} for dest, src in FAC_MAIN_RENAME.items(): if src in fac_cols: raw = row[idx[src]] # destinations text → convertir REAL en chaîne propre fac_update[dest] = num_to_text(raw) if isinstance(raw, float) else (raw or "") # zones JSON depuis zone1..zone6 zones = [z for z in ("zone1", "zone2", "zone3", "zone4", "zone5", "zone6") if z in fac_cols and row[idx[z]]] fac_update["zones"] = json.dumps(zones) # tppc dérivé de l'ancien typeTPPC (rempli ⇒ TPPC actif) if "typeTPPC" in fac_cols: fac_update["tppc"] = 1 if (row[idx["typeTPPC"]] not in (None, "")) else 0 if dry: migrated += 1 continue id_projet = insert_record(cur, "facprojet", projet_vals) id_tarif = insert_record(cur, "factarif", tarif_vals) fac_update["projet"] = id_projet fac_update["tarif"] = id_tarif sets = ", ".join(f'"{k}"=?' for k in fac_update) cur.execute(f'UPDATE fac SET {sets}, updated=? WHERE id=?', list(fac_update.values()) + [now_pb(), fac_id]) migrated += 1 log(f" FAC : {migrated} migrés, {skipped} déjà à jour (total {len(rows)})") return migrated, skipped # ───────────────────────────────────────────────────────────────────────────── # Vérification # ───────────────────────────────────────────────────────────────────────────── def verify(cur, log): ok = True def check(cond, msg): nonlocal ok log((" ✓ " if cond else " ✗ ") + msg) if not cond: ok = False # 1) collections présentes for c in ("projetRC", "tarifRC", "facprojet", "factarif"): check(collection_exists(cur, c), f"collection '{c}' présente") cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (c,)) check(cur.fetchone() is not None, f"table SQLite '{c}' présente") # 2) colonnes/relations ajoutées rc_cols = set(table_columns(cur, "rc")) check({"projetRC", "tarifRC"}.issubset(rc_cols), "rc possède projetRC & tarifRC") fac_cols = set(table_columns(cur, "fac")) check({"projet", "tarif", "zones"}.issubset(fac_cols), "fac possède projet, tarif, zones") # 3) intégrité relationnelle + couverture cur.execute("SELECT COUNT(*) FROM rc") n_rc = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM rc WHERE projetRC!='' AND projetRC IS NOT NULL") n_rc_mig = cur.fetchone()[0] check(n_rc == n_rc_mig, f"tous les rc ont un projetRC ({n_rc_mig}/{n_rc})") cur.execute("SELECT COUNT(*) FROM rc r LEFT JOIN projetRC p ON r.projetRC=p.id " "WHERE r.projetRC!='' AND p.id IS NULL") check(cur.fetchone()[0] == 0, "toutes les relations rc.projetRC résolvent") cur.execute("SELECT COUNT(*) FROM rc r LEFT JOIN tarifRC t ON r.tarifRC=t.id " "WHERE r.tarifRC!='' AND t.id IS NULL") check(cur.fetchone()[0] == 0, "toutes les relations rc.tarifRC résolvent") cur.execute("SELECT COUNT(*) FROM fac") n_fac = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM fac WHERE projet!='' AND projet IS NOT NULL") n_fac_mig = cur.fetchone()[0] check(n_fac == n_fac_mig, f"tous les fac ont un facprojet ({n_fac_mig}/{n_fac})") cur.execute("SELECT COUNT(*) FROM fac f LEFT JOIN facprojet p ON f.projet=p.id " "WHERE f.projet!='' AND p.id IS NULL") check(cur.fetchone()[0] == 0, "toutes les relations fac.projet résolvent") cur.execute("SELECT COUNT(*) FROM fac f LEFT JOIN factarif t ON f.tarif=t.id " "WHERE f.tarif!='' AND t.id IS NULL") check(cur.fetchone()[0] == 0, "toutes les relations fac.tarif résolvent") # 4) contrôle de copie : champs RC identiques entre rc et projetRC cur.execute("""SELECT COUNT(*) FROM rc r JOIN projetRC p ON r.projetRC=p.id WHERE r.actVoiturier IS NOT p.actVoiturier OR IFNULL(r.dateEffet,'') != IFNULL(p.dateEffet,'') OR r.zone1 IS NOT p.zone1""") check(cur.fetchone()[0] == 0, "données RC copiées fidèlement (actVoiturier/dateEffet/zone1)") # 5) contrôle FAC : facprojet récupère valeurAssuree & assureAdditionnel cur.execute("""SELECT COUNT(*) FROM fac f JOIN facprojet p ON f.projet=p.id WHERE IFNULL(f.valeurAssuree,'') != IFNULL(p.valeurAssuree,'')""") check(cur.fetchone()[0] == 0, "données FAC projet copiées (valeurAssuree)") # 6) zones JSON cohérentes avec les anciens booléens cur.execute("""SELECT COUNT(*) FROM fac WHERE (zone1 AND instr(IFNULL(zones,''),'zone1')=0) OR (zone3 AND instr(IFNULL(zones,''),'zone3')=0)""") check(cur.fetchone()[0] == 0, "fac.zones (JSON) cohérent avec zone1..6") # 7) relations contrat intactes cur.execute("SELECT COUNT(*) FROM contrat c LEFT JOIN rc r ON c.rc=r.id " "WHERE c.rc!='' AND r.id IS NULL") check(cur.fetchone()[0] == 0, "relations contrat.rc toujours valides") cur.execute("SELECT COUNT(*) FROM contrat c LEFT JOIN fac f ON c.fac=f.id " "WHERE c.fac!='' AND f.id IS NULL") check(cur.fetchone()[0] == 0, "relations contrat.fac toujours valides") log("") log(" ═══ VÉRIFICATION " + ("RÉUSSIE ✓" if ok else "ÉCHOUÉE ✗") + " ═══") return ok # ───────────────────────────────────────────────────────────────────────────── # Orchestration # ───────────────────────────────────────────────────────────────────────────── def run(db_path, dry_run, verify_only, assume_yes): if not os.path.isfile(db_path): print(f"ERREUR : fichier introuvable : {db_path}", file=sys.stderr) return 2 # garde-fou : WAL non checkpointé ⇒ PocketBase tourne peut-être encore wal = db_path + "-wal" if os.path.isfile(wal) and os.path.getsize(wal) > 0 and not verify_only: print("⚠️ Un fichier -wal non vide est présent : PocketBase est-il bien ARRÊTÉ ?") if not assume_yes and input(" Continuer quand même ? [o/N] ").strip().lower() not in ("o", "y"): return 1 def log(m): print(m) log("═" * 70) log(" Migration RC/FAC → modèle éclaté (main / projet / tarif)") log(f" Base : {db_path}") log(f" Mode : {'VÉRIFICATION SEULE' if verify_only else ('SIMULATION (dry-run)' if dry_run else 'MIGRATION RÉELLE')}") log("═" * 70) # backup if not dry_run and not verify_only: bak = db_path + ".bak-" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") shutil.copy2(db_path, bak) log(f" ✓ backup : {bak}") if not assume_yes: if input(" Lancer la migration ? [o/N] ").strip().lower() not in ("o", "y"): log(" Annulé.") return 1 conn = sqlite3.connect(db_path) conn.isolation_level = None # gestion manuelle de la transaction cur = conn.cursor() try: if verify_only: verify(cur, log) conn.close() return 0 cur.execute("BEGIN") resolver = lambda name: collection_id(cur, name) log("\n[1/4] Création des collections manquantes") create_collection(cur, "projetRC", PROJET_RC_FIELDS, resolver, log) create_collection(cur, "tarifRC", TARIF_RC_FIELDS, resolver, log) create_collection(cur, "facprojet", FAC_PROJET_FIELDS, resolver, log) create_collection(cur, "factarif", FAC_TARIF_FIELDS, resolver, log) log("\n[2/4] Ajout des colonnes/relations à rc et fac") add_columns(cur, "rc", RC_MAIN_ADD, resolver, log) add_columns(cur, "fac", FAC_MAIN_ADD, resolver, log) log("\n[3/4] Migration des données") migrate_rc(cur, log, dry_run) migrate_fac(cur, log, dry_run) if dry_run: log("\n[dry-run] Aucune modification écrite → ROLLBACK") cur.execute("ROLLBACK") conn.close() return 0 cur.execute("COMMIT") log("\n[4/4] Vérification post-migration") ok = verify(cur, log) cur.execute("PRAGMA wal_checkpoint(TRUNCATE)") conn.close() return 0 if ok else 3 except Exception as e: try: cur.execute("ROLLBACK") except Exception: pass conn.close() log(f"\n✗ ERREUR — transaction annulée (ROLLBACK), aucune modification appliquée :\n {e}") import traceback traceback.print_exc() return 4 def main(): ap = argparse.ArgumentParser(description="Migration PocketBase : éclatement RC/FAC.") ap.add_argument("--db", required=True, help="chemin vers pb_data/data.db") ap.add_argument("--dry-run", action="store_true", help="simulation (rollback systématique)") ap.add_argument("--verify-only", action="store_true", help="vérifie l'état sans rien modifier") ap.add_argument("--yes", action="store_true", help="ne pas demander de confirmation") args = ap.parse_args() sys.exit(run(args.db, args.dry_run, args.verify_only, args.yes)) if __name__ == "__main__": main()