Files
kite/.planning/phases/04-folders-sharing-quotas-document-ux/04-06-PLAN.md
T
curo1305 747303246a docs(04): create phase 4 plan (9 plans, 7 waves)
Folders, Sharing, Quotas & Document UX — plans verified (0 blockers,
2 non-blocking warnings). Covers FOLD-01..05, SHARE-01..05, SEC-08/09,
ADMIN-06, DOC-01/02.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:20:16 +02:00

275 lines
17 KiB
Markdown

---
phase: 04-folders-sharing-quotas-document-ux
plan: 06
type: execute
wave: 4
depends_on:
- "04-03"
- "04-04"
files_modified:
- backend/api/audit.py
- backend/tasks/audit_tasks.py
- backend/celery_app.py
- backend/main.py
autonomous: true
requirements:
- ADMIN-06
must_haves:
truths:
- "Admin can retrieve paginated audit log entries filtered by date range, user, and action type"
- "Audit log entries never contain document content, filenames, or extracted text"
- "Regular user requesting audit log returns 403"
- "CSV export returns streaming response with correct Content-Disposition: attachment header"
- "Celery beat daily export task runs at midnight UTC and uploads CSV to audit-logs bucket"
- "MinIOBackend.put_object_raw() is used for the daily export (not the documents key scheme)"
artifacts:
- path: "backend/api/audit.py"
provides: "GET /api/admin/audit-log (paginated, filtered), GET /api/admin/audit-log/export (CSV stream)"
- path: "backend/tasks/audit_tasks.py"
provides: "audit_log_daily_export Celery task — queries DB, writes CSV, uploads to MinIO audit-logs bucket"
- path: "backend/celery_app.py"
provides: "beat_schedule extended with audit-log-daily-export at midnight UTC"
key_links:
- from: "backend/api/audit.py"
to: "backend/deps/auth.py"
via: "get_current_admin dep on all audit log endpoints"
pattern: "get_current_admin"
- from: "backend/tasks/audit_tasks.py"
to: "backend/storage/minio_backend.py"
via: "put_object_raw(bucket='audit-logs', key='audit-logs/YYYY-MM-DD.csv', ...)"
pattern: "put_object_raw"
---
<objective>
Implement the admin audit log viewer API (ADMIN-06) and the daily Celery beat export task (D-17).
This plan runs in parallel with plan 04-05 (both depend on Wave 2 completion, neither depends on each other).
Purpose: Give admins a paginated, filterable audit log and ensure log data is exported daily to MinIO.
Output: backend/api/audit.py + backend/tasks/audit_tasks.py + celery_app.py + main.py updates.
</objective>
<execution_context>
@$HOME/.claude/get-shit-done/workflows/execute-plan.md
@$HOME/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/phases/04-folders-sharing-quotas-document-ux/04-CONTEXT.md
@.planning/phases/04-folders-sharing-quotas-document-ux/04-PATTERNS.md
@.planning/phases/04-folders-sharing-quotas-document-ux/04-RESEARCH.md
@backend/api/admin.py
@backend/tasks/document_tasks.py
@backend/celery_app.py
@backend/db/models.py
</context>
<interfaces>
<!-- Key interfaces the executor needs. Extracted from codebase. -->
<!-- From backend/db/models.py — AuditLog model (read the actual file to confirm):
class AuditLog(Base):
__tablename__ = "audit_log"
id: Mapped[int] # primary key, auto-increment
event_type: Mapped[str]
user_id: Mapped[Optional[uuid.UUID]] # FK to users.id
actor_id: Mapped[Optional[uuid.UUID]] # FK to users.id
resource_id: Mapped[Optional[uuid.UUID]]
ip_address: Mapped[Optional[str]] # INET type in PostgreSQL
metadata_: mapped_column(JSONB, name="metadata") # ORM attr = metadata_; DB col = metadata
created_at: Mapped[datetime]
-->
<!-- _audit_to_dict() whitelist (from PATTERNS.md — MUST include ONLY these fields):
{
"id": entry.id,
"event_type": entry.event_type,
"user_id": str(entry.user_id) if entry.user_id else None,
"actor_id": str(entry.actor_id) if entry.actor_id else None,
"resource_id": str(entry.resource_id) if entry.resource_id else None,
"ip_address": str(entry.ip_address) if entry.ip_address else None,
"metadata_": entry.metadata_,
"created_at": entry.created_at.isoformat(),
}
# FORBIDDEN keys: filename, extracted_text, content (ADMIN-06, D-15)
-->
<!-- From backend/tasks/document_tasks.py — Celery task pattern (read actual file for exact imports):
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
return asyncio.run(_run(document_id))
async def _run(document_id: str) -> dict:
from db.session import AsyncSessionLocal # deferred import avoids circular
...
async with AsyncSessionLocal() as session:
...
-->
<!-- From backend/celery_app.py — existing beat_schedule (read actual file for exact structure):
celery_app.conf.beat_schedule = {
"cleanup-abandoned-uploads": {
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
"schedule": _timedelta(minutes=30),
},
}
-->
</interfaces>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Create backend/api/audit.py — admin audit log viewer + CSV export</name>
<files>backend/api/audit.py, backend/main.py</files>
<read_first>
backend/api/admin.py — read the entire file; extract the get_current_admin dep usage pattern, the paginated list query pattern (lines ~140-160), the _user_to_dict() whitelist helper pattern, and how the router is prefixed
backend/db/models.py — read the AuditLog class fully; confirm the ORM attribute name is metadata_ (not metadata); confirm all column names
</read_first>
<behavior>
GET /api/admin/audit-log:
- Auth: get_current_admin (regular user → 403)
- Query params: start (Optional[datetime] = None), end (Optional[datetime] = None), user_id (Optional[uuid.UUID] = None), event_type (Optional[str] = None), page (int = 1, ge=1), per_page (int = 50, ge=1, le=500)
- Build SQLAlchemy query: select(AuditLog).order_by(AuditLog.created_at.desc())
- Apply filters: if start → .where(AuditLog.created_at >= start); if end → .where(AuditLog.created_at <= end); if user_id → .where(AuditLog.user_id == user_id); if event_type → .where(AuditLog.event_type == event_type)
- Apply pagination: .limit(per_page).offset((page - 1) * per_page)
- Also run a COUNT query with same filters (no limit/offset) for total
- Return {items: [_audit_to_dict(e) for e in entries], total: count, page: page, per_page: per_page}
GET /api/admin/audit-log/export:
- Auth: get_current_admin
- Query params: same filters as viewer (start, end, user_id, event_type), format: str = "csv"
- Query all matching rows (no pagination)
- For format="csv": use csv.DictWriter with io.StringIO; write all rows via _audit_to_dict(); return StreamingResponse(iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=audit-export.csv"})
- CRITICAL: _audit_to_dict() must NEVER include filename, extracted_text, or document content keys
_audit_to_dict() helper: pure whitelist dict — id, event_type, user_id, actor_id, resource_id, ip_address, metadata_, created_at — no other keys possible.
Register router in main.py.
</behavior>
<action>
Create backend/api/audit.py.
Imports: `from __future__ import annotations`, `import csv`, `import io`, `import uuid`, `from datetime import datetime`, `from typing import Optional`, `from fastapi import APIRouter, Depends, Query`, `from fastapi.responses import StreamingResponse`, `from pydantic import BaseModel`, `from sqlalchemy import select, func`, `from sqlalchemy.ext.asyncio import AsyncSession`, `from db.models import AuditLog`, `from deps.auth import get_current_admin`, `from deps.db import get_db`.
Router: `router = APIRouter(prefix="/api/admin", tags=["audit"])`.
Define _audit_to_dict() as a module-level function. The whitelist is: id, event_type, user_id (str or None), actor_id (str or None), resource_id (str or None), ip_address (str or None), metadata_ (the JSONB dict value), created_at (isoformat). The function MUST NOT include filename, extracted_text, or any document content fields. Add a docstring: "Safe audit log serializer — never includes filename, extracted_text, or document content (ADMIN-06, D-15)."
Define _build_query() helper (or inline) that accepts filters and returns a SQLAlchemy Select statement with filters applied. Reuse for both list and export endpoints.
Implement GET /api/admin/audit-log and GET /api/admin/audit-log/export as specified.
After creating audit.py, modify backend/main.py: add `from api.audit import router as audit_router` and `app.include_router(audit_router)`.
</action>
<verify>
<automated>cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_audit.py -x -v --no-header 2>&1 | tail -30</automated>
</verify>
<acceptance_criteria>
- backend/api/audit.py exists with GET /api/admin/audit-log and GET /api/admin/audit-log/export endpoints
- Both endpoints use get_current_admin dep (grep: `Depends(get_current_admin)` appears twice in audit.py)
- _audit_to_dict() whitelist does NOT contain filename, extracted_text, password_hash, or credentials_enc (grep: these strings absent from the dict literal in _audit_to_dict)
- CSV export returns StreamingResponse with Content-Disposition: attachment (grep: `attachment` in audit.py)
- `python -c "from api.audit import router"` exits 0
- test_audit_log_regular_user_403 turns green or remains xfail — not FAILED
- test_audit_log_no_doc_content turns green or remains xfail — not FAILED
- `cd backend && python -m pytest tests/ -x --no-header 2>&1 | grep -E "^FAILED"` returns nothing
</acceptance_criteria>
<done>Audit log viewer and CSV export implemented; admin-only access confirmed; no doc content in serializer.</done>
</task>
<task type="auto">
<name>Task 2: Create backend/tasks/audit_tasks.py + extend celery_app.py beat schedule</name>
<files>backend/tasks/audit_tasks.py, backend/celery_app.py</files>
<read_first>
backend/tasks/document_tasks.py — read the entire file; extract the Celery task decorator pattern, the asyncio.run(_run()) bridge pattern, the AsyncSessionLocal usage with deferred imports inside the async function body, and the error handling pattern
backend/celery_app.py — read the entire file; extract the beat_schedule dict structure, how _timedelta is imported, and where to add the crontab import and new beat entry; find the task_routes dict
</read_first>
<behavior>
audit_log_daily_export Celery task:
- Sync Celery entry point: `@celery_app.task(name="tasks.audit_tasks.audit_log_daily_export") def audit_log_daily_export() -> dict: return asyncio.run(_run_daily_export())`
- _run_daily_export() async function:
- Compute yesterday: `yesterday = date.today() - timedelta(days=1)` using `from datetime import date, datetime, timedelta, timezone`
- start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc)
- end = start + timedelta(days=1)
- Open AsyncSessionLocal() session; query AuditLog where created_at >= start AND created_at < end; order by created_at
- Build CSV using csv.DictWriter + io.StringIO; use the same field list as _audit_to_dict: id, event_type, user_id, actor_id, resource_id, ip_address, metadata_, created_at
- csv_bytes = output.getvalue().encode("utf-8")
- key = f"audit-logs/{yesterday.isoformat()}.csv"
- Call `await get_storage_backend().put_object_raw(bucket="audit-logs", key=key, data=io.BytesIO(csv_bytes), length=len(csv_bytes), content_type="text/csv")`
- Return {"exported": len(rows), "key": key, "date": yesterday.isoformat()}
- Error handling: wrap the entire _run_daily_export in try/except; on exception: log error and return {"exported": 0, "error": str(e)}
celery_app.py beat_schedule extension:
- Import `from celery.schedules import crontab as _crontab` (alias with underscore like _timedelta)
- Add to beat_schedule dict: "audit-log-daily-export" → {"task": "tasks.audit_tasks.audit_log_daily_export", "schedule": _crontab(hour=0, minute=0)}
- Add to task_routes: "tasks.audit_tasks.*": {"queue": "documents"} (reuse documents worker queue per PATTERNS.md)
</behavior>
<action>
Create backend/tasks/audit_tasks.py. Module docstring: describe the daily audit export task and its MinIO target.
Top-level imports: `import asyncio`, `from celery_app import celery_app`. All other imports go inside the async function body to avoid circular imports (per established pattern from document_tasks.py).
Inside _run_daily_export() body (deferred imports): `from datetime import date, datetime, timedelta, timezone`, `import csv`, `import io`, `from db.session import AsyncSessionLocal`, `from db.models import AuditLog`, `from sqlalchemy import select`, `from storage import get_storage_backend`.
Implement the task and async function exactly as specified in the behavior block.
Modify backend/celery_app.py: add `from celery.schedules import crontab as _crontab` to imports. Add the new beat entry to `celery_app.conf.beat_schedule`. Add the task route to `celery_app.conf.task_routes`.
</action>
<verify>
<automated>cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from tasks.audit_tasks import audit_log_daily_export; print('OK')"</automated>
</verify>
<acceptance_criteria>
- backend/tasks/audit_tasks.py exists and is importable without error
- `python -c "from tasks.audit_tasks import audit_log_daily_export"` exits 0
- audit_log_daily_export is registered with name "tasks.audit_tasks.audit_log_daily_export" (grep: `name="tasks.audit_tasks.audit_log_daily_export"`)
- _run_daily_export calls put_object_raw with bucket="audit-logs" (grep: `put_object_raw` and `audit-logs` in audit_tasks.py)
- celery_app.py beat_schedule contains "audit-log-daily-export" entry with `_crontab(hour=0, minute=0)` (grep: `audit-log-daily-export` and `crontab` in celery_app.py)
- celery_app.py task_routes contains `tasks.audit_tasks.*` (grep confirms)
- All deferred imports are inside the async function body, not at module top level (grep: import lines inside `async def _run_daily_export`)
- `cd backend && python -m pytest tests/ -x --no-header 2>&1 | grep -E "^FAILED"` returns nothing
</acceptance_criteria>
<done>Celery export task importable; beat schedule updated; put_object_raw used for audit-logs bucket writes.</done>
</task>
</tasks>
<threat_model>
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| Admin → GET /api/admin/audit-log | Admin-authenticated; regular users cannot access |
| Celery worker → MinIO audit-logs bucket | Service-to-service; uses env-var credentials; bucket is private |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-04-06-01 | Broken Access Control | GET /api/admin/audit-log access by regular user | mitigate | get_current_admin dep: regular user role → 403; test_audit_log_regular_user_403 validates |
| T-04-06-02 | Sensitive Data Exposure | Audit log returning document content | mitigate | _audit_to_dict() whitelist explicitly excludes filename, extracted_text; no other fields can be added without modifying the dict literal (safe-by-default) |
| T-04-06-03 | Information Disclosure | CSV export containing sensitive data | mitigate | CSV export uses the same _audit_to_dict() whitelist as the JSON viewer; both share the same helper function |
| T-04-06-04 | Tampering | audit-logs MinIO bucket publicly accessible | mitigate | Bucket created without public policy (MinIO default is private); confirmed in migration 0004 |
| T-04-06-05 | Denial of Service | Unbounded CSV export | accept | Export scoped by same date/user/event_type filters as viewer; max rows bounded by time window (1 day per task run); admin-only endpoint |
| T-04-SC | Tampering | npm/pip/cargo installs | accept | No new packages installed in this plan |
</threat_model>
<verification>
1. Audit log tests: `cd backend && python -m pytest tests/test_audit.py -v --no-header`
2. Admin-only check: `grep -n "get_current_admin" backend/api/audit.py` — must appear on both endpoints
3. No doc content in serializer: `grep -n "filename\|extracted_text\|password_hash\|credentials_enc" backend/api/audit.py` — must return nothing from _audit_to_dict function body
4. Celery task import: `cd backend && python -c "from tasks.audit_tasks import audit_log_daily_export; print('OK')"`
5. Full suite: `cd backend && python -m pytest tests/ -v --no-header 2>&1 | grep -E "FAILED|ERROR"`
</verification>
<success_criteria>
- Audit log viewer returns paginated, filtered entries with no document content in any field
- CSV export streams valid CSV with Content-Disposition: attachment
- Daily Celery task exports to audit-logs MinIO bucket using put_object_raw
- Beat schedule updated; task route registered
- test_audit.py tests green or xfail; zero FAILED in full suite
</success_criteria>
<output>
Create `.planning/phases/04-folders-sharing-quotas-document-ux/04-06-SUMMARY.md` when done.
</output>