Files
kite/backend/tasks/email_tasks.py
T
curo1305 9fc820d893 feat(02-01): implement services/auth.py full auth service layer and email_tasks.py
- services/auth.py: Argon2 password hashing (pwdlib), constant-time verify (SEC-06)
- JWT create/decode for access tokens and password-reset tokens (typ claim validation, T-02-01)
- Refresh token lifecycle: create, rotate, revoke-all with family revocation (AUTH-07, RFC 9700)
- Family revocation enqueues send_security_alert_email.delay on token reuse (T-02-02)
- TOTP provisioning (pyotp) and verification with Redis replay prevention, valid_window=1 (AUTH-08)
- Backup code generation (8-char hex uppercase), storage (Argon2 hashed), constant-time verify (T-02-03)
- HIBP k-anonymity check via SHA-1 prefix (T-02-05), fail-open on network error (T-02-06)
- Admin bootstrap: idempotent, logs WARNING if env vars missing (D-04/D-05/D-06)
- services/email.py: SMTP send + dev stdout fallback (D-01/D-02)
- tasks/email_tasks.py: send_reset_email and send_security_alert_email Celery tasks
- celery_app.py: add email queue route for tasks.email_tasks.*
- TDD tests: 17 tests covering all auth primitives and family revocation
2026-05-22 19:23:42 +02:00

74 lines
2.8 KiB
Python

"""
Celery tasks for email dispatch in DocuVault.
Tasks follow the same pattern as document_tasks.py:
- Plain sync def (Celery workers have no asyncio event loop by default)
- Async body via asyncio.run()
- All imports deferred inside _run_* functions to avoid circular imports
(see celery_app.py comment — do NOT import config at module level)
Tasks registered here:
send_reset_email — dispatches a password-reset email to the user
send_security_alert_email — dispatches a security alert on refresh token reuse (AUTH-07)
"""
import asyncio
from celery_app import celery_app
@celery_app.task(name="tasks.email_tasks.send_reset_email")
def send_reset_email(to_address: str, reset_link: str) -> dict:
"""Synchronous Celery entry-point — send a password reset email.
Called as: send_reset_email.delay(to_address, reset_link)
Delegates to the async body via asyncio.run().
"""
return asyncio.run(_run_send_reset(to_address, reset_link))
async def _run_send_reset(to_address: str, reset_link: str) -> dict:
"""Async body of send_reset_email. Deferred imports to avoid circular deps."""
from services.email import send_password_reset_email # noqa: PLC0415
try:
send_password_reset_email(to_address, reset_link)
return {"status": "sent", "to": to_address}
except Exception as exc:
return {"status": "failed", "error": str(exc)}
@celery_app.task(name="tasks.email_tasks.send_security_alert_email")
def send_security_alert_email(user_id: str) -> dict:
"""Synchronous Celery entry-point — send a security alert on token reuse.
Called as: send_security_alert_email.delay(user_id)
Fetches the user's email from the DB inside the task using asyncio.run().
On SMTP not configured: logs a WARNING per D-02 convention.
"""
return asyncio.run(_run_send_security_alert(user_id))
async def _run_send_security_alert(user_id: str) -> dict:
"""Async body of send_security_alert_email. Deferred imports to avoid circular deps."""
import uuid as _uuid # noqa: PLC0415
from db.session import AsyncSessionLocal # noqa: PLC0415
from db.models import User # noqa: PLC0415
from services.email import send_security_alert_email_sync # noqa: PLC0415
try:
user_uuid = _uuid.UUID(user_id)
except ValueError:
return {"status": "failed", "error": f"Invalid user_id: {user_id}"}
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_uuid)
if user is None:
return {"status": "failed", "error": f"User {user_id} not found"}
to_address = user.email
send_security_alert_email_sync(to_address, user_id)
return {"status": "sent", "user_id": user_id}
except Exception as exc:
return {"status": "failed", "error": str(exc)}