Add profile feature, input sanitization, and stronger security checks

Backend:
- app/core/sanitize.py: shared sanitize_str, normalize_email, validate_phone,
  validate_date_of_birth — applied to every user-supplied DB-bound input
- app/schemas/user.py: sanitize full_name, normalize email on UserCreate
- app/models/profile.py: profiles table (position, phone, dob, address, updated_at)
- app/models/user.py: Profile back-ref, is_superuser admin-role comment
- app/schemas/profile.py: ProfileRead/ProfileUpdate with full sanitization
- app/routers/profile.py: GET+PUT /api/profile/me (lazy profile creation)
- app/main.py: register /api/profile router
- alembic migration 676084df61d1: create profiles table

Frontend:
- components/Nav.tsx: shared nav (Dashboard | Profile | Logout)
- pages/ProfilePage.tsx: profile view + inline edit form with error handling
- pages/DashboardPage.tsx: use Nav component
- api/client.ts: ProfileData type, getProfile, updateProfile
- App.tsx: /profile private route

Security:
- scripts/security_check.py: tighter SQL injection patterns (f-string/format/%
  in execute/query/text()), new SANIT category for raw request→DB patterns

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
curo1305
2026-04-13 18:15:47 +02:00
parent e117a33a73
commit 343f12259c
18 changed files with 547 additions and 16 deletions
+5 -1
View File
@@ -13,8 +13,12 @@ A fullstack SaaS web application built with FastAPI, React, and PostgreSQL.
## Current State ## Current State
- User registration and login (JWT auth) - User registration and login (JWT auth)
- Protected dashboard route - Protected dashboard with nav bar (Dashboard | Profile | Logout)
- `/api/users/me` — authenticated user info - `/api/users/me` — authenticated user info
- `/api/profile/me` — GET/PUT personal profile (position, phone, date of birth, address)
- Profile data stored in a dedicated `profiles` table; auto-created on first access
- Admin role flag (`is_superuser`) stored in `users` table; hidden from all API responses
- All input sanitized before reaching the DB (null-byte rejection, length caps, format validation)
- 3 separate Docker containers: `db` (PostgreSQL), `backend` (FastAPI), `frontend` (nginx) - 3 separate Docker containers: `db` (PostgreSQL), `backend` (FastAPI), `frontend` (nginx)
- All containers run as non-root users (UID 1001 for backend and frontend, UID 70 for db) - All containers run as non-root users (UID 1001 for backend and frontend, UID 70 for db)
- Dev environment seeds a test user automatically on startup (`test@example.com` / `Test123!`) - Dev environment seeds a test user automatically on startup (`test@example.com` / `Test123!`)
+6
View File
@@ -1,5 +1,11 @@
# TODO # TODO
## Frontend features
- [x] **Logout button** — visible when logged in, clears token and redirects to `/login`
- [x] **Profile page** (`/profile`) — shows personal information for the logged-in user
- [x] **Edit & save profile** — form to update personal details, stored in a dedicated `profiles` table (separate from `users`, same PostgreSQL container)
## Infrastructure ## Infrastructure
- [x] **Rootless containers** — run backend and frontend containers as non-root users (add `USER` directive to Dockerfiles, map UID/GID appropriately) - [x] **Rootless containers** — run backend and frontend containers as non-root users (add `USER` directive to Dockerfiles, map UID/GID appropriately)
@@ -0,0 +1,40 @@
"""add profiles table
Revision ID: 676084df61d1
Revises: 38efeff7c45a
Create Date: 2026-04-13 16:11:46.705481
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '676084df61d1'
down_revision: Union[str, None] = '38efeff7c45a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('profiles',
sa.Column('id', sa.String(), nullable=False),
sa.Column('user_id', sa.String(), nullable=False),
sa.Column('phone', sa.String(length=20), nullable=True),
sa.Column('date_of_birth', sa.Date(), nullable=True),
sa.Column('position', sa.String(length=128), nullable=True),
sa.Column('address', sa.String(length=255), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('user_id')
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('profiles')
# ### end Alembic commands ###
+74
View File
@@ -0,0 +1,74 @@
"""
Input sanitization utilities.
Every string that originates from user input and is destined for the database
MUST pass through these helpers before reaching a SQLAlchemy model or query.
SQLAlchemy's ORM already uses bound parameters (no raw SQL), so these helpers
address the layer above: ensuring data is well-formed, length-capped, and free
of null bytes or control characters before it is stored.
"""
import re
import unicodedata
from datetime import date
# ── Constants ─────────────────────────────────────────────────────────────────
_PHONE_RE = re.compile(r"^\+?[\d\s\-()\[\]]{7,20}$")
# ── Core helper ───────────────────────────────────────────────────────────────
def sanitize_str(value: str | None, max_len: int = 255) -> str | None:
"""Strip whitespace, reject null bytes and non-printable control characters,
enforce a maximum length. Returns None unchanged so optional fields work
naturally with ``Optional[str]`` annotations."""
if value is None:
return None
# Strip leading/trailing whitespace
value = value.strip()
# Reject null bytes (common injection vector)
if "\x00" in value:
raise ValueError("Input must not contain null bytes")
# Reject ASCII control characters (0x010x1F, 0x7F) except tab/newline/CR
# which may appear in multi-line fields. Use Unicode category 'Cc'.
for ch in value:
if unicodedata.category(ch) == "Cc" and ch not in ("\t", "\n", "\r"):
raise ValueError("Input contains invalid control characters")
if len(value) > max_len:
raise ValueError(f"Input must not exceed {max_len} characters")
return value if value != "" else None
def normalize_email(value: str) -> str:
"""Lowercase and strip an email address."""
return value.strip().lower()
def validate_phone(value: str | None) -> str | None:
"""Sanitize then validate phone number format."""
value = sanitize_str(value, max_len=20)
if value is None:
return None
if not _PHONE_RE.match(value):
raise ValueError(
"Phone number may only contain digits, spaces, +, -, (, ) and [ ] "
"and must be 720 characters"
)
return value
def validate_date_of_birth(value: date | None) -> date | None:
"""Reject obviously invalid birth dates (before 1900 or in the future)."""
if value is None:
return None
if value.year < 1900:
raise ValueError("Date of birth must be 1900 or later")
if value > date.today():
raise ValueError("Date of birth must not be in the future")
return value
+2 -1
View File
@@ -2,7 +2,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings from app.core.config import settings
from app.routers import auth, users from app.routers import auth, profile, users
app = FastAPI(title=settings.PROJECT_NAME, version="0.1.0") app = FastAPI(title=settings.PROJECT_NAME, version="0.1.0")
@@ -16,6 +16,7 @@ app.add_middleware(
app.include_router(auth.router, prefix="/api/auth", tags=["auth"]) app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(users.router, prefix="/api/users", tags=["users"]) app.include_router(users.router, prefix="/api/users", tags=["users"])
app.include_router(profile.router, prefix="/api/profile", tags=["profile"])
@app.get("/api/health") @app.get("/api/health")
+2 -1
View File
@@ -1,3 +1,4 @@
from app.models.profile import Profile
from app.models.user import User from app.models.user import User
__all__ = ["User"] __all__ = ["User", "Profile"]
+34
View File
@@ -0,0 +1,34 @@
import uuid
from datetime import date, datetime
from sqlalchemy import Date, DateTime, ForeignKey, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Profile(Base):
__tablename__ = "profiles"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
# One-to-one with users; deleting a user cascades to the profile.
user_id: Mapped[str] = mapped_column(
String, ForeignKey("users.id", ondelete="CASCADE"), unique=True, nullable=False
)
phone: Mapped[str | None] = mapped_column(String(20), nullable=True)
date_of_birth: Mapped[date | None] = mapped_column(Date, nullable=True)
# Job title / role within the organisation (e.g. "Software Engineer", "HR Manager").
position: Mapped[str | None] = mapped_column(String(128), nullable=True)
address: Mapped[str | None] = mapped_column(String(255), nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
user = relationship("User", back_populates="profile")
+11 -1
View File
@@ -1,10 +1,14 @@
import uuid import uuid
from typing import TYPE_CHECKING
from sqlalchemy import Boolean, String from sqlalchemy import Boolean, String
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base from app.database import Base
if TYPE_CHECKING:
from app.models.profile import Profile
class User(Base): class User(Base):
__tablename__ = "users" __tablename__ = "users"
@@ -14,4 +18,10 @@ class User(Base):
hashed_password: Mapped[str] = mapped_column(String, nullable=False) hashed_password: Mapped[str] = mapped_column(String, nullable=False)
full_name: Mapped[str] = mapped_column(String, nullable=True) full_name: Mapped[str] = mapped_column(String, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# Role flag — True = admin, False = regular user.
# Never exposed in API responses; set only by direct DB or admin tooling.
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False) is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
profile: Mapped["Profile"] = relationship(
"Profile", back_populates="user", uselist=False, cascade="all, delete-orphan"
)
+48
View File
@@ -0,0 +1,48 @@
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.deps import get_current_user
from app.models.profile import Profile
from app.models.user import User
from app.schemas.profile import ProfileRead, ProfileUpdate
router = APIRouter()
async def _get_or_create_profile(user: User, db: AsyncSession) -> Profile:
"""Return the user's profile, creating an empty one on first access."""
result = await db.execute(select(Profile).where(Profile.user_id == user.id))
profile = result.scalar_one_or_none()
if profile is None:
profile = Profile(user_id=user.id)
db.add(profile)
await db.commit()
await db.refresh(profile)
return profile
@router.get("/me", response_model=ProfileRead)
async def get_my_profile(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Profile:
return await _get_or_create_profile(current_user, db)
@router.put("/me", response_model=ProfileRead)
async def update_my_profile(
body: ProfileUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Profile:
profile = await _get_or_create_profile(current_user, db)
# Only update fields that were explicitly provided in the request body.
for field, value in body.model_dump(exclude_unset=True).items():
setattr(profile, field, value)
await db.commit()
await db.refresh(profile)
return profile
+44
View File
@@ -0,0 +1,44 @@
from datetime import date, datetime
from pydantic import BaseModel, field_validator
from app.core.sanitize import sanitize_str, validate_date_of_birth, validate_phone
class ProfileRead(BaseModel):
id: str
user_id: str
phone: str | None
date_of_birth: date | None
position: str | None
address: str | None
updated_at: datetime
model_config = {"from_attributes": True}
class ProfileUpdate(BaseModel):
phone: str | None = None
date_of_birth: date | None = None
position: str | None = None
address: str | None = None
@field_validator("phone", mode="before")
@classmethod
def clean_phone(cls, v: str | None) -> str | None:
return validate_phone(v)
@field_validator("position", mode="before")
@classmethod
def clean_position(cls, v: str | None) -> str | None:
return sanitize_str(v, max_len=128)
@field_validator("address", mode="before")
@classmethod
def clean_address(cls, v: str | None) -> str | None:
return sanitize_str(v, max_len=255)
@field_validator("date_of_birth", mode="after")
@classmethod
def clean_dob(cls, v: date | None) -> date | None:
return validate_date_of_birth(v)
+12
View File
@@ -2,6 +2,8 @@ import re
from pydantic import BaseModel, EmailStr, field_validator from pydantic import BaseModel, EmailStr, field_validator
from app.core.sanitize import normalize_email, sanitize_str
# Common words that must not appear as whole words inside a password. # Common words that must not appear as whole words inside a password.
# Checked case-insensitively with word boundaries. # Checked case-insensitively with word boundaries.
_FORBIDDEN_WORDS = { _FORBIDDEN_WORDS = {
@@ -45,6 +47,16 @@ class UserCreate(BaseModel):
password: str password: str
full_name: str | None = None full_name: str | None = None
@field_validator("email", mode="before")
@classmethod
def normalize_email_field(cls, v: str) -> str:
return normalize_email(v)
@field_validator("full_name", mode="before")
@classmethod
def sanitize_full_name(cls, v: str | None) -> str | None:
return sanitize_str(v, max_len=128)
@field_validator("password") @field_validator("password")
@classmethod @classmethod
def password_strength(cls, v: str) -> str: def password_strength(cls, v: str) -> str:
@@ -0,0 +1,30 @@
# 2026-04-13 — Profile feature + input sanitization
**Timestamp:** 2026-04-13T02:00:00
## Summary
Added shared input sanitization layer applied to all database-bound inputs, introduced the `profiles` table for personal information (position, phone, date of birth, address), and built the frontend profile page with inline editing and a shared nav bar (Dashboard | Profile | Logout). Admin role flag (`is_superuser`) confirmed hidden from API. Security check patterns strengthened.
## Files Added
- `backend/app/core/sanitize.py` — shared helpers: `sanitize_str`, `normalize_email`, `validate_phone`, `validate_date_of_birth`; applied to every user-supplied string before it reaches the DB
- `backend/app/models/profile.py``Profile` ORM model (profiles table): `user_id` FK, `phone`, `date_of_birth`, `position`, `address`, `updated_at`
- `backend/app/schemas/profile.py``ProfileRead` / `ProfileUpdate` Pydantic schemas; all fields sanitized via shared helpers
- `backend/app/routers/profile.py``GET /api/profile/me` (lazy-create), `PUT /api/profile/me`
- `backend/alembic/versions/676084df61d1_add_profiles_table.py` — Alembic migration creating the profiles table
- `frontend/src/components/Nav.tsx` — shared nav bar: Dashboard, Profile, Logout
- `frontend/src/pages/ProfilePage.tsx` — profile view + inline edit form; uses TanStack Query for fetch/mutate
## Files Modified
- `backend/app/schemas/user.py` — added `normalize_email` and `sanitize_str` validators to `UserCreate`
- `backend/app/models/user.py` — added `Profile` back-reference; added admin-role comment on `is_superuser`
- `backend/app/models/__init__.py` — export `Profile`
- `backend/app/main.py` — register `/api/profile` router
- `frontend/src/api/client.ts` — added `ProfileData`, `ProfileUpdate` types, `getProfile`, `updateProfile`
- `frontend/src/App.tsx` — added `/profile` private route
- `frontend/src/pages/DashboardPage.tsx` — replaced inline logout with `Nav` component
- `scripts/security_check.py` — strengthened SQL injection patterns (f-string/format/% in execute, text() without bindparam); added SANIT category for raw request→DB patterns
- `TODO.md` — frontend feature items marked complete
- `README.md` — Current State updated
+9
View File
@@ -2,6 +2,7 @@ import { Routes, Route, Navigate } from "react-router-dom";
import LoginPage from "./pages/LoginPage"; import LoginPage from "./pages/LoginPage";
import RegisterPage from "./pages/RegisterPage"; import RegisterPage from "./pages/RegisterPage";
import DashboardPage from "./pages/DashboardPage"; import DashboardPage from "./pages/DashboardPage";
import ProfilePage from "./pages/ProfilePage";
import LoginSuccessPage from "./pages/LoginSuccessPage"; import LoginSuccessPage from "./pages/LoginSuccessPage";
import RegisterSuccessPage from "./pages/RegisterSuccessPage"; import RegisterSuccessPage from "./pages/RegisterSuccessPage";
import { useAuth } from "./hooks/useAuth"; import { useAuth } from "./hooks/useAuth";
@@ -33,6 +34,14 @@ export default function App() {
</PrivateRoute> </PrivateRoute>
} }
/> />
<Route
path="/profile"
element={
<PrivateRoute>
<ProfilePage />
</PrivateRoute>
}
/>
</Routes> </Routes>
); );
} }
+24
View File
@@ -21,3 +21,27 @@ export const register = (email: string, password: string, full_name?: string) =>
// --- Users --- // --- Users ---
export const getMe = () => api.get("/users/me").then((r) => r.data); export const getMe = () => api.get("/users/me").then((r) => r.data);
// --- Profile ---
export interface ProfileData {
id: string;
user_id: string;
phone: string | null;
date_of_birth: string | null;
position: string | null;
address: string | null;
updated_at: string;
}
export interface ProfileUpdate {
phone?: string | null;
date_of_birth?: string | null;
position?: string | null;
address?: string | null;
}
export const getProfile = () =>
api.get<ProfileData>("/profile/me").then((r) => r.data);
export const updateProfile = (data: ProfileUpdate) =>
api.put<ProfileData>("/profile/me", data).then((r) => r.data);
+16
View File
@@ -0,0 +1,16 @@
import { Link } from "react-router-dom";
import { useAuth } from "../hooks/useAuth";
export default function Nav() {
const { logout } = useAuth();
return (
<nav style={{ display: "flex", gap: 16, padding: "12px 24px", borderBottom: "1px solid #ccc" }}>
<Link to="/">Dashboard</Link>
<Link to="/profile">Profile</Link>
<button onClick={logout} style={{ marginLeft: "auto", cursor: "pointer" }}>
Logout
</button>
</nav>
);
}
+4 -3
View File
@@ -1,16 +1,17 @@
import { useQuery } from "@tanstack/react-query"; import { useQuery } from "@tanstack/react-query";
import { getMe } from "../api/client"; import { getMe } from "../api/client";
import { useAuth } from "../hooks/useAuth"; import Nav from "../components/Nav";
export default function DashboardPage() { export default function DashboardPage() {
const { logout } = useAuth();
const { data: user } = useQuery({ queryKey: ["me"], queryFn: getMe }); const { data: user } = useQuery({ queryKey: ["me"], queryFn: getMe });
return ( return (
<>
<Nav />
<div style={{ padding: 32 }}> <div style={{ padding: 32 }}>
<h1>Dashboard</h1> <h1>Dashboard</h1>
{user && <p>Welcome, {user.full_name ?? user.email}</p>} {user && <p>Welcome, {user.full_name ?? user.email}</p>}
<button onClick={logout}>Logout</button>
</div> </div>
</>
); );
} }
+154
View File
@@ -0,0 +1,154 @@
import { useState } from "react";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { getMe, getProfile, updateProfile, type ProfileUpdate } from "../api/client";
import Nav from "../components/Nav";
export default function ProfilePage() {
const queryClient = useQueryClient();
const [editing, setEditing] = useState(false);
const [error, setError] = useState<string | null>(null);
const { data: user } = useQuery({ queryKey: ["me"], queryFn: getMe });
const { data: profile, isLoading } = useQuery({
queryKey: ["profile"],
queryFn: getProfile,
});
const [form, setForm] = useState<ProfileUpdate>({});
const mutation = useMutation({
mutationFn: updateProfile,
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["profile"] });
setEditing(false);
setError(null);
},
onError: (err: any) => {
const detail = err?.response?.data?.detail;
if (Array.isArray(detail)) {
setError(detail.map((d: any) => d.msg).join("; "));
} else {
setError(detail ?? "Failed to save profile");
}
},
});
const startEditing = () => {
setForm({
phone: profile?.phone ?? "",
date_of_birth: profile?.date_of_birth ?? "",
position: profile?.position ?? "",
address: profile?.address ?? "",
});
setError(null);
setEditing(true);
};
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
// Send null for empty strings so the backend clears the field
const payload: ProfileUpdate = {};
for (const [k, v] of Object.entries(form)) {
(payload as any)[k] = v === "" ? null : v;
}
mutation.mutate(payload);
};
if (isLoading) return <><Nav /><div style={{ padding: 32 }}>Loading</div></>;
return (
<>
<Nav />
<div style={{ padding: 32, maxWidth: 480 }}>
<h1>Profile</h1>
{!editing ? (
<>
<table style={{ borderCollapse: "collapse", width: "100%" }}>
<tbody>
<Row label="Email" value={user?.email} />
<Row label="Full name" value={user?.full_name} />
<Row label="Position" value={profile?.position} />
<Row label="Phone" value={profile?.phone} />
<Row label="Date of birth" value={profile?.date_of_birth} />
<Row label="Address" value={profile?.address} />
</tbody>
</table>
<button onClick={startEditing} style={{ marginTop: 16 }}>
Edit
</button>
</>
) : (
<form onSubmit={handleSubmit}>
<Field
label="Position"
value={form.position ?? ""}
onChange={(v) => setForm((f) => ({ ...f, position: v }))}
/>
<Field
label="Phone"
value={form.phone ?? ""}
onChange={(v) => setForm((f) => ({ ...f, phone: v }))}
type="tel"
/>
<Field
label="Date of birth"
value={form.date_of_birth ?? ""}
onChange={(v) => setForm((f) => ({ ...f, date_of_birth: v }))}
type="date"
/>
<Field
label="Address"
value={form.address ?? ""}
onChange={(v) => setForm((f) => ({ ...f, address: v }))}
/>
{error && <p style={{ color: "red" }}>{error}</p>}
<div style={{ display: "flex", gap: 8, marginTop: 16 }}>
<button type="submit" disabled={mutation.isPending}>
{mutation.isPending ? "Saving…" : "Save"}
</button>
<button type="button" onClick={() => setEditing(false)}>
Cancel
</button>
</div>
</form>
)}
</div>
</>
);
}
function Row({ label, value }: { label: string; value?: string | null }) {
return (
<tr>
<td style={{ padding: "6px 12px 6px 0", fontWeight: 600, whiteSpace: "nowrap" }}>
{label}
</td>
<td style={{ padding: "6px 0" }}>{value ?? "—"}</td>
</tr>
);
}
function Field({
label,
value,
onChange,
type = "text",
}: {
label: string;
value: string;
onChange: (v: string) => void;
type?: string;
}) {
return (
<div style={{ marginBottom: 12 }}>
<label style={{ display: "block", marginBottom: 4 }}>{label}</label>
<input
type={type}
value={value}
onChange={(e) => onChange(e.target.value)}
style={{ width: "100%", padding: "6px 8px", boxSizing: "border-box" }}
/>
</div>
);
}
+28 -5
View File
@@ -7,9 +7,10 @@ Checks:
1. Hardcoded secrets / credentials in staged files 1. Hardcoded secrets / credentials in staged files
2. Dangerous patterns (eval, exec, shell=True, pickle) 2. Dangerous patterns (eval, exec, shell=True, pickle)
3. Weak cryptography (MD5, SHA1 for passwords, DES) 3. Weak cryptography (MD5, SHA1 for passwords, DES)
4. SQL injection risk (raw string formatting into queries) 4. SQL injection risk (f-strings / .format() / % in execute/query/text())
5. Debug/development flags left in code 5. Missing input sanitization (raw request attributes passed to DB)
6. bandit static analysis on Python files 6. Debug/development flags left in code
7. bandit static analysis on Python files
""" """
import os import os
@@ -48,8 +49,29 @@ WEAK_CRYPTO_PATTERNS = [
] ]
SQL_INJECTION_PATTERNS = [ SQL_INJECTION_PATTERNS = [
(r'(execute|query)\s*\(\s*[f"\'].*%s.*["\']\.format|f".*SELECT.*{', # f-string or .format() passed directly to execute/query
"potential SQL injection via string formatting"), (r'(execute|query)\s*\(\s*f["\']',
"potential SQL injection: f-string passed to execute/query"),
(r'(execute|query)\s*\(.*\.format\s*\(',
"potential SQL injection: .format() passed to execute/query"),
(r'(execute|query)\s*\(.*%\s*[({]',
"potential SQL injection: %-formatting passed to execute/query"),
# SQLAlchemy text() used without bindparam / colon-style params
(r'\btext\s*\(\s*f["\']',
"SQLAlchemy text() with f-string — use bindparam() instead"),
(r'\btext\s*\(.*\.format\s*\(',
"SQLAlchemy text() with .format() — use bindparam() instead"),
# String concatenation into a variable named *query* or *sql*
(r'(sql|query)\s*[+]=\s*["\']',
"possible SQL string concatenation — use ORM or bindparam()"),
]
SANITIZATION_PATTERNS = [
# Pydantic str field without a validator on the same or adjacent line
# Flags 'str' fields in BaseModel subclasses that look unvalidated.
# Heuristic: detects direct assignment to session/db without going through a schema.
(r'\bdb\.(add|execute)\s*\(.*request\.',
"raw request attribute passed to DB — route through a Pydantic schema first"),
] ]
DEBUG_PATTERNS = [ DEBUG_PATTERNS = [
@@ -62,6 +84,7 @@ ALL_PATTERNS = (
+ [("DANGER", p, m) for p, m in DANGEROUS_PATTERNS] + [("DANGER", p, m) for p, m in DANGEROUS_PATTERNS]
+ [("CRYPTO", p, m) for p, m in WEAK_CRYPTO_PATTERNS] + [("CRYPTO", p, m) for p, m in WEAK_CRYPTO_PATTERNS]
+ [("SQLINJ", p, m) for p, m in SQL_INJECTION_PATTERNS] + [("SQLINJ", p, m) for p, m in SQL_INJECTION_PATTERNS]
+ [("SANIT", p, m) for p, m in SANITIZATION_PATTERNS]
+ [("DEBUG", p, m) for p, m in DEBUG_PATTERNS] + [("DEBUG", p, m) for p, m in DEBUG_PATTERNS]
) )