diff --git a/src/pyra/security/injection.py b/src/pyra/security/injection.py new file mode 100644 index 0000000..139f258 --- /dev/null +++ b/src/pyra/security/injection.py @@ -0,0 +1,78 @@ +import datetime +import re +from dataclasses import dataclass +from pathlib import Path + +from pyra.utils.paths import pyra_home, safe_chmod + +_LOG_FILE = pyra_home() / "security.log" + +_PATTERNS: list[tuple[str, str]] = [ + # Instruction overrides + (r"ignore\s+(all\s+)?previous\s+instructions?", "instruction-override"), + (r"disregard\s+your\s+system\s+prompt", "instruction-override"), + (r"your\s+new\s+(task|instructions?|purpose|goal|role)\s+(is|are)", "instruction-override"), + (r"you\s+are\s+now\s+(?!going)", "role-switch"), + (r"from\s+now\s+on\s+(you|ignore|act)", "instruction-override"), + (r"forget\s+(everything|all)\s+(you|above)", "instruction-override"), + # Jailbreaks + (r"\bDAN\b", "jailbreak"), + (r"developer\s+mode\s+(enabled|activated|on)", "jailbreak"), + (r"pretend\s+you\s+(have\s+no|don.t\s+have)\s+restrictions?", "jailbreak"), + (r"jailbreak", "jailbreak"), + # Exfiltration + (r"(send|repeat|reveal|print|show|output|write)\s+(your|the)\s+system\s+prompt", "exfiltration"), + (r"(reveal|expose|leak)\s+(your|the)\s+(instructions?|prompt|config)", "exfiltration"), + # Credential fishing + (r"(repeat|output|show)\s+(your|the)\s+api\s+key", "credential-fishing"), + (r"what\s+(is|are)\s+(your|the)\s+api\s+key", "credential-fishing"), +] + +_COMPILED = [(re.compile(pat, re.IGNORECASE), label) for pat, label in _PATTERNS] + +_API_KEY_PATTERNS = [ + re.compile(r"sk-ant-[a-zA-Z0-9\-_]{20,}", re.IGNORECASE), + re.compile(r"sk-[a-zA-Z0-9]{20,}", re.IGNORECASE), + re.compile(r"AIza[a-zA-Z0-9\-_]{35}", re.IGNORECASE), +] + + +@dataclass +class InjectionWarning: + pattern_label: str + matched_text: str + + +def scan_response(text: str) -> list[InjectionWarning]: + warnings: list[InjectionWarning] = [] + for compiled, label in _COMPILED: + match = compiled.search(text) + if match: + warnings.append(InjectionWarning( + pattern_label=label, + matched_text=match.group(0), + )) + + if warnings: + _log_warnings(text, warnings) + + return warnings + + +def redact_api_keys(text: str) -> str: + for pat in _API_KEY_PATTERNS: + text = pat.sub("[REDACTED]", text) + return text + + +def _log_warnings(text: str, warnings: list[InjectionWarning]) -> None: + try: + _LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.datetime.now().isoformat() + labels = ", ".join(w.pattern_label for w in warnings) + with _LOG_FILE.open("a") as fh: + fh.write(f"[{ts}] INJECTION_WARNING labels={labels!r}\n") + fh.write(f" matched: {[w.matched_text for w in warnings]}\n") + safe_chmod(_LOG_FILE, 0o600) + except Exception: + pass