9fc820d893
- services/auth.py: Argon2 password hashing (pwdlib), constant-time verify (SEC-06) - JWT create/decode for access tokens and password-reset tokens (typ claim validation, T-02-01) - Refresh token lifecycle: create, rotate, revoke-all with family revocation (AUTH-07, RFC 9700) - Family revocation enqueues send_security_alert_email.delay on token reuse (T-02-02) - TOTP provisioning (pyotp) and verification with Redis replay prevention, valid_window=1 (AUTH-08) - Backup code generation (8-char hex uppercase), storage (Argon2 hashed), constant-time verify (T-02-03) - HIBP k-anonymity check via SHA-1 prefix (T-02-05), fail-open on network error (T-02-06) - Admin bootstrap: idempotent, logs WARNING if env vars missing (D-04/D-05/D-06) - services/email.py: SMTP send + dev stdout fallback (D-01/D-02) - tasks/email_tasks.py: send_reset_email and send_security_alert_email Celery tasks - celery_app.py: add email queue route for tasks.email_tasks.* - TDD tests: 17 tests covering all auth primitives and family revocation
116 lines
4.5 KiB
Python
116 lines
4.5 KiB
Python
"""
|
|
Email service — pure Python, no FastAPI coupling.
|
|
|
|
Sends transactional emails via SMTP when SMTP_HOST is configured;
|
|
logs the content to stdout otherwise (D-02 dev fallback).
|
|
|
|
Security notes:
|
|
- Never raises: email failures are non-fatal; log and return
|
|
- Celery task wrapper handles retry/error reporting
|
|
"""
|
|
import logging
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def send_password_reset_email(to_address: str, reset_link: str) -> None:
|
|
"""Send (or log) the password reset email.
|
|
|
|
When SMTP_HOST is not configured (dev / CI), logs the reset link to stdout
|
|
per D-02. The API response is 202 regardless — no token in the body.
|
|
|
|
Never raises — failures are logged and the function returns normally.
|
|
"""
|
|
from config import settings # deferred to avoid module-level side effects
|
|
|
|
if not settings.smtp_host:
|
|
# D-02: dev fallback — log token link to stdout
|
|
logger.info("DEV MODE — password reset link for %s: %s", to_address, reset_link)
|
|
return
|
|
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = "DocuVault — password reset"
|
|
msg["From"] = settings.smtp_from
|
|
msg["To"] = to_address
|
|
|
|
text_body = (
|
|
f"You requested a password reset for DocuVault.\n\n"
|
|
f"Click the link below (valid for 1 hour):\n{reset_link}\n\n"
|
|
"If you did not request this, ignore this email."
|
|
)
|
|
html_body = (
|
|
f"<p>You requested a password reset for DocuVault.</p>"
|
|
f"<p><a href='{reset_link}'>Reset your password</a> (valid 1 hour)</p>"
|
|
f"<p>If you did not request this, ignore this email.</p>"
|
|
)
|
|
msg.attach(MIMEText(text_body, "plain"))
|
|
msg.attach(MIMEText(html_body, "html"))
|
|
|
|
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as server:
|
|
server.ehlo()
|
|
server.starttls()
|
|
if settings.smtp_user:
|
|
server.login(settings.smtp_user, settings.smtp_password)
|
|
server.sendmail(settings.smtp_from, [to_address], msg.as_string())
|
|
|
|
logger.info("Password reset email sent to %s", to_address)
|
|
except Exception as exc:
|
|
logger.error("Failed to send password reset email to %s: %s", to_address, exc)
|
|
|
|
|
|
def send_security_alert_email_sync(to_address: str, user_id: str) -> None:
|
|
"""Send (or log) a security alert email about suspicious refresh token reuse.
|
|
|
|
Called by the email_tasks.py Celery task after looking up the user email.
|
|
Never raises — failures are logged.
|
|
"""
|
|
from config import settings # deferred import
|
|
|
|
if not settings.smtp_host:
|
|
logger.warning(
|
|
"Security alert for user %s: suspicious refresh token reuse detected "
|
|
"(SMTP not configured — email not sent to %s)",
|
|
user_id,
|
|
to_address,
|
|
)
|
|
return
|
|
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = "DocuVault — security alert: suspicious login detected"
|
|
msg["From"] = settings.smtp_from
|
|
msg["To"] = to_address
|
|
|
|
text_body = (
|
|
"DocuVault detected suspicious activity on your account.\n\n"
|
|
"A previously revoked refresh token was used to attempt a session refresh. "
|
|
"All active sessions have been revoked as a precaution.\n\n"
|
|
"If this was not you, please change your password immediately."
|
|
)
|
|
html_body = (
|
|
"<p><strong>DocuVault security alert</strong></p>"
|
|
"<p>A previously revoked refresh token was used to attempt a session refresh. "
|
|
"All active sessions have been revoked as a precaution.</p>"
|
|
"<p>If this was not you, please change your password immediately.</p>"
|
|
)
|
|
msg.attach(MIMEText(text_body, "plain"))
|
|
msg.attach(MIMEText(html_body, "html"))
|
|
|
|
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as server:
|
|
server.ehlo()
|
|
server.starttls()
|
|
if settings.smtp_user:
|
|
server.login(settings.smtp_user, settings.smtp_password)
|
|
server.sendmail(settings.smtp_from, [to_address], msg.as_string())
|
|
|
|
logger.info("Security alert email sent to %s (user_id=%s)", to_address, user_id)
|
|
except Exception as exc:
|
|
logger.error(
|
|
"Failed to send security alert email to %s (user_id=%s): %s",
|
|
to_address, user_id, exc
|
|
)
|