Files
curo1305 9fc820d893 feat(02-01): implement services/auth.py full auth service layer and email_tasks.py
- services/auth.py: Argon2 password hashing (pwdlib), constant-time verify (SEC-06)
- JWT create/decode for access tokens and password-reset tokens (typ claim validation, T-02-01)
- Refresh token lifecycle: create, rotate, revoke-all with family revocation (AUTH-07, RFC 9700)
- Family revocation enqueues send_security_alert_email.delay on token reuse (T-02-02)
- TOTP provisioning (pyotp) and verification with Redis replay prevention, valid_window=1 (AUTH-08)
- Backup code generation (8-char hex uppercase), storage (Argon2 hashed), constant-time verify (T-02-03)
- HIBP k-anonymity check via SHA-1 prefix (T-02-05), fail-open on network error (T-02-06)
- Admin bootstrap: idempotent, logs WARNING if env vars missing (D-04/D-05/D-06)
- services/email.py: SMTP send + dev stdout fallback (D-01/D-02)
- tasks/email_tasks.py: send_reset_email and send_security_alert_email Celery tasks
- celery_app.py: add email queue route for tasks.email_tasks.*
- TDD tests: 17 tests covering all auth primitives and family revocation
2026-05-22 19:23:42 +02:00

116 lines
4.5 KiB
Python

"""
Email service — pure Python, no FastAPI coupling.
Sends transactional emails via SMTP when SMTP_HOST is configured;
logs the content to stdout otherwise (D-02 dev fallback).
Security notes:
- Never raises: email failures are non-fatal; log and return
- Celery task wrapper handles retry/error reporting
"""
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logger = logging.getLogger(__name__)
def send_password_reset_email(to_address: str, reset_link: str) -> None:
"""Send (or log) the password reset email.
When SMTP_HOST is not configured (dev / CI), logs the reset link to stdout
per D-02. The API response is 202 regardless — no token in the body.
Never raises — failures are logged and the function returns normally.
"""
from config import settings # deferred to avoid module-level side effects
if not settings.smtp_host:
# D-02: dev fallback — log token link to stdout
logger.info("DEV MODE — password reset link for %s: %s", to_address, reset_link)
return
try:
msg = MIMEMultipart("alternative")
msg["Subject"] = "DocuVault — password reset"
msg["From"] = settings.smtp_from
msg["To"] = to_address
text_body = (
f"You requested a password reset for DocuVault.\n\n"
f"Click the link below (valid for 1 hour):\n{reset_link}\n\n"
"If you did not request this, ignore this email."
)
html_body = (
f"<p>You requested a password reset for DocuVault.</p>"
f"<p><a href='{reset_link}'>Reset your password</a> (valid 1 hour)</p>"
f"<p>If you did not request this, ignore this email.</p>"
)
msg.attach(MIMEText(text_body, "plain"))
msg.attach(MIMEText(html_body, "html"))
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as server:
server.ehlo()
server.starttls()
if settings.smtp_user:
server.login(settings.smtp_user, settings.smtp_password)
server.sendmail(settings.smtp_from, [to_address], msg.as_string())
logger.info("Password reset email sent to %s", to_address)
except Exception as exc:
logger.error("Failed to send password reset email to %s: %s", to_address, exc)
def send_security_alert_email_sync(to_address: str, user_id: str) -> None:
"""Send (or log) a security alert email about suspicious refresh token reuse.
Called by the email_tasks.py Celery task after looking up the user email.
Never raises — failures are logged.
"""
from config import settings # deferred import
if not settings.smtp_host:
logger.warning(
"Security alert for user %s: suspicious refresh token reuse detected "
"(SMTP not configured — email not sent to %s)",
user_id,
to_address,
)
return
try:
msg = MIMEMultipart("alternative")
msg["Subject"] = "DocuVault — security alert: suspicious login detected"
msg["From"] = settings.smtp_from
msg["To"] = to_address
text_body = (
"DocuVault detected suspicious activity on your account.\n\n"
"A previously revoked refresh token was used to attempt a session refresh. "
"All active sessions have been revoked as a precaution.\n\n"
"If this was not you, please change your password immediately."
)
html_body = (
"<p><strong>DocuVault security alert</strong></p>"
"<p>A previously revoked refresh token was used to attempt a session refresh. "
"All active sessions have been revoked as a precaution.</p>"
"<p>If this was not you, please change your password immediately.</p>"
)
msg.attach(MIMEText(text_body, "plain"))
msg.attach(MIMEText(html_body, "html"))
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as server:
server.ehlo()
server.starttls()
if settings.smtp_user:
server.login(settings.smtp_user, settings.smtp_password)
server.sendmail(settings.smtp_from, [to_address], msg.as_string())
logger.info("Security alert email sent to %s (user_id=%s)", to_address, user_id)
except Exception as exc:
logger.error(
"Failed to send security alert email to %s (user_id=%s): %s",
to_address, user_id, exc
)