""" Backend migration service. Flow: 1. POST /migrate → validate new backend (test_connection) 2. Background task enumerates all objects in all known buckets 3. Each object is copied old → new, then verified 4. Only after 100 % success: atomically switch active backend 5. Delete all objects from old backend 6. If any copy fails: old backend stays active; state = "failed" 7. DELETE /migrate cancels a running migration (old backend stays active) """ import logging from dataclasses import dataclass, field from typing import Literal from app.services.backends.base import AbstractStorageBackend from app.services.backend_manager import get_backend, switch_backend logger = logging.getLogger(__name__) # All logical buckets the service knows about — enumerated during migration. KNOWN_BUCKETS = ["documents", "config"] MigrationState = Literal[ "idle", "validating", "migrating", "switching", "cleaning", "done", "failed", "cancelled" ] @dataclass class _MigrationStatus: state: MigrationState = "idle" total: int = 0 done: int = 0 failed: int = 0 errors: list[str] = field(default_factory=list) _status = _MigrationStatus() _cancel_requested: bool = False def get_status() -> dict: return { "state": _status.state, "total": _status.total, "done": _status.done, "failed": _status.failed, "errors": _status.errors[:50], # cap to avoid huge responses } def is_in_progress() -> bool: return _status.state in ("validating", "migrating", "switching", "cleaning") async def cancel() -> bool: global _cancel_requested if _status.state == "migrating": _cancel_requested = True return True return False async def run_migration(new_backend: AbstractStorageBackend) -> None: """ Background task: copy all objects to new_backend, verify, switch, clean old. Called after the caller has already validated new_backend.test_connection(). """ global _cancel_requested _cancel_requested = False old_backend = get_backend() _status.state = "migrating" _status.done = 0 _status.failed = 0 _status.errors.clear() try: # Collect all objects across every known bucket all_objects: list[tuple[str, str]] = [] for bucket in KNOWN_BUCKETS: try: keys = await old_backend.list_keys(bucket) for key in keys: all_objects.append((bucket, key)) except Exception as exc: logger.warning("Could not list bucket %r: %s", bucket, exc) _status.total = len(all_objects) logger.info("Migration: %d objects to migrate across %d buckets", len(all_objects), len(KNOWN_BUCKETS)) for bucket, key in all_objects: if _cancel_requested: _status.state = "cancelled" logger.info("Migration cancelled (%d/%d done)", _status.done, _status.total) return try: data = await old_backend.get(bucket, key) await new_backend.put(bucket, key, data) if not await new_backend.exists(bucket, key): raise OSError("Verification failed: object absent after PUT") _status.done += 1 except Exception as exc: _status.failed += 1 entry = f"{bucket}/{key}: {exc}" _status.errors.append(entry) logger.warning("Migration copy failed — %s", entry) if _status.failed > 0: _status.state = "failed" logger.error( "Migration failed: %d/%d objects could not be copied; old backend remains active", _status.failed, _status.total, ) return # All objects verified — atomically switch _status.state = "switching" switch_backend(new_backend) # Remove all objects from old backend (best-effort) _status.state = "cleaning" for bucket, key in all_objects: try: await old_backend.delete(bucket, key) except Exception as exc: logger.warning("Cleanup failed for %s/%s: %s", bucket, key, exc) _status.state = "done" logger.info("Migration complete: %d objects moved to %s", _status.total, new_backend.driver_name) except Exception as exc: _status.state = "failed" _status.errors.append(f"Unexpected error: {exc}") logger.exception("Migration aborted with unexpected error")