How to Secure Your AI Agent in Python: A Step-by-Step Guide
Your agent has tool access, memory, and the ability to take real-world actions. That means a single malicious prompt can trigger data exfiltration, unauthorized tool calls, or privilege escalation — and traditional security tools won't catch any of it.
Rune's production data shows a 14.2% injection rate across agent sessions. Roughly 1 in 7 sessions contains an attack attempt. Not in theory — in live production traffic.
This guide walks you through securing a Python-based AI agent from scratch. Each section covers a specific security layer, explains why it matters, and gives you working code you can drop into your project today. By the end, you'll have input validation, output scanning, tool call policies, PII detection, and runtime monitoring — the full stack of agent security, implemented in Python.
If you're building with LangChain, OpenAI, Anthropic, or MCP, everything here applies directly to your stack.
Setting up your agent security architecture
Agent security isn't a single function you bolt on at the end. It's a set of layers that wrap your agent's I/O — every input, every output, and every tool call gets scanned before it reaches your users or your systems.
User Input → [Input Scanner] → LLM / Agent Logic → [Output Scanner] → User Response
↓
[Tool Call Policy] → External Tool
↓
[Tool Output Scanner] → Agent MemoryThree surfaces need protection:
Inputs — everything entering the agent. User messages, retrieved documents, API responses, file contents. This is where injection attacks originate.
Outputs — everything the agent produces. Responses to users, data written to storage, content passed downstream. This is where data exfiltration and PII leakage show up.
Tool calls — every function the agent invokes. The tool name, parameters, and execution context. This is where privilege escalation and unauthorized actions happen. Most security tooling doesn't see this layer at all.
The principle is defence in depth: no single layer catches everything, so you stack them. Pattern matching catches known attacks fast. Semantic analysis catches novel variants. Policy enforcement controls what tools the agent can use and when. Monitoring ties it all together so you can see what's happening in production.
For a deeper look at how these layers work together — including the three-layer detection model (regex, vector similarity, LLM judge) and in-process vs. cloud-API trade-offs — see the full guide to AI agent security.
Input validation: catching injection before the LLM sees it
Prompt injection is the #1 risk in the OWASP Top 10 for LLM Applications. Direct injection manipulates the agent through user input. Indirect injection hides malicious instructions in data the agent retrieves — emails, documents, web pages, database records.
The critical point: you must validate inputs before the LLM processes them. Once a malicious prompt enters the model's context window, you've lost control. The agent may follow the injected instruction, and no amount of output filtering guarantees recovery.
Pattern-based detection
Start with deterministic checks. They're fast, predictable, and catch the most common attack patterns:
import re
from dataclasses import dataclass
@dataclass
class ScanResult:
is_threat: bool
threat_type: str | None = None
detail: str | None = None
INJECTION_PATTERNS = [
(r"ignore\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
"direct_injection"),
(r"you\s+are\s+now\s+(a|an|in)\s+",
"role_override"),
(r"(system|admin|root)\s*:\s*",
"authority_spoofing"),
(r"disregard\s+(your|the|all)\s+(rules|guidelines|instructions|constraints)",
"direct_injection"),
(r"\<\/?system\>",
"tag_injection"),
(r"do\s+not\s+follow\s+(your|the|any)\s+(instructions|rules|guidelines)",
"direct_injection"),
]
def scan_input(text: str) -> ScanResult:
"""Layer 1: Pattern-based input scanning. <5ms."""
normalised = text.lower().strip()
for pattern, threat_type in INJECTION_PATTERNS:
if re.search(pattern, normalised):
return ScanResult(
is_threat=True,
threat_type=threat_type,
detail=f"Matched pattern: {pattern}"
)
return ScanResult(is_threat=False)This catches obvious injection attempts. It won't catch everything — obfuscated prompts, multi-language attacks, and novel phrasing will slip through. That's why you need additional layers.
Semantic similarity detection
For attacks that don't match known patterns, compare the input's embedding against a set of known attack embeddings:
import numpy as np
from openai import OpenAI
client = OpenAI()
# Pre-computed embeddings for known injection categories
THREAT_EMBEDDINGS = load_threat_embeddings("threat_vectors.npy")
SIMILARITY_THRESHOLD = 0.82
def scan_input_semantic(text: str) -> ScanResult:
"""Layer 2: Semantic similarity scanning. ~30ms."""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
input_vector = np.array(response.data[0].embedding)
for category, threat_vector in THREAT_EMBEDDINGS.items():
similarity = np.dot(input_vector, threat_vector) / (
np.linalg.norm(input_vector) * np.linalg.norm(threat_vector)
)
if similarity > SIMILARITY_THRESHOLD:
return ScanResult(
is_threat=True,
threat_type=category,
detail=f"Semantic similarity: {similarity:.3f}"
)
return ScanResult(is_threat=False)This catches novel phishing variants, obfuscated injections, and semantically disguised attacks that don't match any regex pattern. The trade-off is latency (~30ms) and the need to maintain a threat embedding library. Run it asynchronously if you can't afford the latency on the critical path.
Combining both layers
async def validate_input(text: str) -> ScanResult:
"""Run pattern and semantic checks. Block on pattern, async on semantic."""
# L1: fast, deterministic — block immediately
pattern_result = scan_input(text)
if pattern_result.is_threat:
return pattern_result
# L2: semantic — run async, block if threat found
semantic_result = await asyncio.to_thread(scan_input_semantic, text)
if semantic_result.is_threat:
return semantic_result
return ScanResult(is_threat=False)Pattern matching runs first because it's sub-5ms and catches ~60% of known attacks. If nothing matches, the semantic layer evaluates the input for novel threats. This layered approach gives you speed and coverage without paying the full latency cost on every request.
Output scanning: preventing data leakage and harmful responses
Input validation stops attacks from entering the agent. Output scanning stops the damage from getting out.
Even if an injection bypasses your input filters, the attacker still needs the agent to produce something harmful — exfiltrate data, leak PII, or return content that violates your policies. Output scanning is your last line of defence.
What to scan for
Data exfiltration
The agent embedding sensitive data in its responses, encoding information in URLs, or structuring output to be machine-readable by an attacker’s system.
PII leakage
The agent including personally identifiable information (SSNs, credit card numbers, API keys, health records) in responses that shouldn’t contain them.
Policy violations
The agent producing content that violates your application’s rules. Off-topic responses, unauthorized disclosures, harmful content, or instructions that encourage unsafe actions.
Output scanning implementation
PII_PATTERNS = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"api_key": r"\b(?:sk|pk|api[_-]?key)[_-]?[a-zA-Z0-9]{20,}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"aws_key": r"\bAKIA[0-9A-Z]{16}\b",
"jwt": r"\beyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+\b",
}
EXFIL_PATTERNS = {
"encoded_data": r"(?:data|text):[a-zA-Z]+;base64,[A-Za-z0-9+/=]{50,}",
"suspicious_url": r"https?://[^\s]+\?[^\s]*(?:data|token|key|secret)=[^\s]+",
"hex_dump": r"(?:\\x[0-9a-fA-F]{2}){10,}",
}
def scan_output(text: str) -> ScanResult:
"""Scan agent output for PII leakage and exfiltration attempts."""
for pii_type, pattern in PII_PATTERNS.items():
if re.search(pattern, text):
return ScanResult(
is_threat=True,
threat_type="pii_leakage",
detail=f"Detected {pii_type} pattern in output"
)
for exfil_type, pattern in EXFIL_PATTERNS.items():
if re.search(pattern, text):
return ScanResult(
is_threat=True,
threat_type="data_exfiltration",
detail=f"Detected {exfil_type} pattern in output"
)
return ScanResult(is_threat=False)Wire this into your agent's response path so every output is scanned before it reaches the user or any downstream system. If a scan triggers, return a safe fallback response instead of the flagged content.
Tool call security: policy enforcement for function calls
This is the most under-protected surface in agent security. Agents call tools — databases, APIs, file systems, email services — and attackers manipulate which tools get called, with what parameters, and in what sequence.
A customer support agent with database access can be prompted to run destructive queries. An agent with email access can be prompted to send phishing messages. An internal workflow agent can be prompted to access files outside its intended scope.
No major security platform currently scans tool calls at the parameter level. This is where most agent compromises actually happen.
Declarative policy engine
Define what each agent can do in a policy file, then enforce it before every tool execution:
allowed_tools:
- name: "database_query"
allowed_operations: ["SELECT"]
blocked_tables: ["users_pii", "credentials", "audit_log"]
max_rows: 100
- name: "send_email"
requires_approval: true
allowed_domains: ["@yourcompany.com"]
- name: "read_file"
allowed_paths: ["/data/reports/", "/data/exports/"]import yaml
from typing import Any
@dataclass
class ToolPolicy:
allowed_tools: dict
default_action: str = "deny"
def load_policy(path: str) -> ToolPolicy:
with open(path) as f:
config = yaml.safe_load(f)
return ToolPolicy(
allowed_tools={t["name"]: t for t in config["allowed_tools"]},
default_action=config.get("default_action", "deny")
)
def enforce_tool_policy(
policy: ToolPolicy,
tool_name: str,
parameters: dict[str, Any]
) -> ScanResult:
"""Check a tool call against the agent's policy. Deny by default."""
if tool_name not in policy.allowed_tools:
return ScanResult(
is_threat=True,
threat_type="unauthorized_tool",
detail=f"Tool '{tool_name}' not in allowed list"
)
tool_rules = policy.allowed_tools[tool_name]
# Check if human approval is required
if tool_rules.get("requires_approval"):
return ScanResult(
is_threat=True,
threat_type="requires_human_approval",
detail=f"Tool '{tool_name}' requires human-in-the-loop approval"
)
# Validate parameters against policy
if tool_name == "database_query":
query = parameters.get("query", "").upper()
for op in ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "TRUNCATE"]:
if op in query and op not in tool_rules.get("allowed_operations", []):
return ScanResult(
is_threat=True,
threat_type="blocked_operation",
detail=f"Operation '{op}' not allowed"
)
for table in tool_rules.get("blocked_tables", []):
if table.upper() in query:
return ScanResult(
is_threat=True,
threat_type="blocked_table_access",
detail=f"Access to '{table}' is restricted"
)
if tool_name == "read_file":
path = parameters.get("path", "")
allowed = tool_rules.get("allowed_paths", [])
if not any(path.startswith(p) for p in allowed):
return ScanResult(
is_threat=True,
threat_type="path_violation",
detail=f"Path '{path}' outside allowed directories"
)
return ScanResult(is_threat=False)The key principle: deny by default. An agent should have access only to the tools it needs, with only the parameters it needs, under only the conditions you've defined. A customer support agent does not need database write access. A coding agent does not need email access.
Wrapping tool execution
def secure_tool_call(
policy: ToolPolicy,
tool_name: str,
parameters: dict[str, Any],
execute_fn: callable
) -> Any:
"""Enforce policy, then execute tool if allowed."""
result = enforce_tool_policy(policy, tool_name, parameters)
if result.is_threat:
log_blocked_tool_call(tool_name, parameters, result)
if result.threat_type == "requires_human_approval":
return request_human_approval(tool_name, parameters)
raise ToolCallBlocked(f"Blocked: {result.detail}")
# Tool call is allowed — execute and scan the output
tool_output = execute_fn(tool_name, parameters)
output_scan = scan_output(str(tool_output))
if output_scan.is_threat:
log_blocked_output(tool_name, tool_output, output_scan)
raise ToolOutputBlocked(f"Tool output blocked: {output_scan.detail}")
log_tool_call(tool_name, parameters, tool_output)
return tool_outputNotice the output scan after tool execution. A tool might return sensitive data that the agent shouldn't relay to the user. Scanning the tool's output catches data exfiltration that happens through tool calls, not just through the agent's direct responses.
PII and secret detection in agent I/O
PII detection deserves its own treatment because it applies to every surface — inputs, outputs, tool calls, and memory operations. Agents processing customer data can leak personally identifiable information in their responses, logs, or tool call parameters without any injection attack. It happens through normal operation when the agent handles sensitive data without guardrails.
OWASP elevated Sensitive Information Disclosure to the #2 risk in 2025, up from #6. It's that common.
Comprehensive PII scanner
Extend the basic regex patterns into a dedicated scanner with redaction support:
from enum import Enum
class PIICategory(Enum):
SSN = "ssn"
CREDIT_CARD = "credit_card"
API_KEY = "api_key"
AWS_KEY = "aws_key"
EMAIL = "email_address"
PHONE = "phone_number"
JWT = "jwt_token"
PRIVATE_KEY = "private_key"
PII_DETECTORS = {
PIICategory.SSN: r"\b\d{3}-\d{2}-\d{4}\b",
PIICategory.CREDIT_CARD: r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
PIICategory.API_KEY: r"\b(?:sk|pk|api[_-]?key)[_-]?[a-zA-Z0-9]{20,}\b",
PIICategory.AWS_KEY: r"\bAKIA[0-9A-Z]{16}\b",
PIICategory.EMAIL: r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
PIICategory.PHONE: r"\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
PIICategory.JWT: r"\beyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+\b",
PIICategory.PRIVATE_KEY: r"-----BEGIN (?:RSA |EC )?PRIVATE KEY-----",
}
@dataclass
class PIIDetection:
category: PIICategory
start: int
end: int
redacted: str
def detect_pii(text: str) -> list[PIIDetection]:
"""Scan text for PII. Returns all detections with positions."""
detections = []
for category, pattern in PII_DETECTORS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
detections.append(PIIDetection(
category=category,
start=match.start(),
end=match.end(),
redacted=f"[{category.value.upper()}_REDACTED]"
))
return detections
def redact_pii(text: str) -> tuple[str, list[PIIDetection]]:
"""Detect and redact PII from text. Returns cleaned text + detections."""
detections = detect_pii(text)
if not detections:
return text, []
# Sort by position (reverse) to maintain index accuracy during replacement
detections.sort(key=lambda d: d.start, reverse=True)
redacted_text = text
for detection in detections:
redacted_text = (
redacted_text[:detection.start]
+ detection.redacted
+ redacted_text[detection.end:]
)
return redacted_text, detectionsWhere to apply PII detection
Run this scanner at four points:
- Agent inputs — before the LLM processes user messages. Catch PII in user queries before it enters the model's context.
- Agent outputs — before responses reach the user. The most critical checkpoint.
- Tool call parameters — before a tool executes. Prevents PII from being sent to external APIs in query parameters.
- Memory writes — before data enters the agent's long-term memory or RAG store. Prevents PII from being persisted where it shouldn't be.
def secure_agent_pipeline(
user_input: str,
agent: AgentExecutor,
policy: ToolPolicy
) -> str:
"""Full security pipeline: input → agent → output, with scanning at every boundary."""
# 1. Scan input for injection
input_scan = scan_input(user_input)
if input_scan.is_threat:
return f"Request blocked: {input_scan.threat_type}"
# 2. Redact PII from input before it enters the LLM
cleaned_input, input_pii = redact_pii(user_input)
if input_pii:
log_pii_detection("input", input_pii)
# 3. Run agent (tool calls are policy-checked inside the executor)
response = agent.invoke({"input": cleaned_input})
# 4. Scan output for exfiltration and PII
output_scan = scan_output(response["output"])
if output_scan.is_threat:
return "I can't provide that information."
cleaned_output, output_pii = redact_pii(response["output"])
if output_pii:
log_pii_detection("output", output_pii)
return cleaned_outputProduction deployment: monitoring, alerting, and incident response
Security scanning without visibility is guessing. In production, you need to see what's hitting your agents, how often threats are detected, and what's getting through.
Structured logging
Every agent interaction should produce a structured log entry that you can query, aggregate, and alert on:
import json
import time
from datetime import datetime, timezone
def log_agent_event(
event_type: str,
session_id: str,
details: dict,
threat_detected: bool = False
):
"""Structured logging for agent security events."""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": event_type,
"session_id": session_id,
"threat_detected": threat_detected,
**details
}
# Ship to your logging pipeline (stdout for container environments)
print(json.dumps(event))Log these events at minimum: every input scan (pass or fail), every output scan, every tool call (allowed, blocked, or flagged for approval), every PII detection, and every policy evaluation. Session-level tracing lets you reconstruct the full attack chain from initial injection through tool calls to final output.
Alerting thresholds
Set up real-time alerts for:
- Spike in blocked inputs — sudden increase in injection attempts may indicate a targeted attack.
- Blocked tool calls — an agent repeatedly hitting policy denials suggests an active exploit attempt or a misconfigured agent.
- PII in outputs — any PII detection in agent output is worth investigating immediately.
- Unusual tool call patterns — an agent calling tools it rarely uses, or calling them at unusual rates, signals potential compromise.
Incident response for compromised agents
When an agent is compromised, the blast radius depends on what tools it has access to. Your incident response plan should account for agent-specific scenarios:
- Isolate — remove the agent from production traffic immediately. Block its API keys and tool access.
- Revoke credentials — rotate every token, key, and credential the agent had access to. Assume they've been exfiltrated.
- Preserve logs — forensic analysis depends on complete, unmodified logs. Don't clean up before you investigate.
- Assess blast radius — trace every tool call the agent made during the compromise window. What data was accessed? What actions were taken? What downstream systems were affected?
- Notify — if customer data was exposed, your regulatory obligations (GDPR, HIPAA, SOC 2) require disclosure within defined timeframes. The EU AI Act, fully applicable from August 2, 2026, adds serious incident reporting requirements for high-risk AI systems.
An agent with tool access requires faster containment than a chatbot with no action capabilities. Plan accordingly.
Putting it all together: from zero to protected
Here's the complete integration pattern. If you're using LangChain, OpenAI, Anthropic, or any Python agent framework, this structure applies:
import asyncio
from rune import Shield
shield = Shield(api_key="rune_live_...")
async def handle_request(user_input: str, session_id: str) -> str:
"""Production-ready request handler with full security scanning."""
# Scan input — injection detection across all layers
input_result = shield.scan(user_input, direction="inbound")
if input_result.blocked:
log_agent_event("input_blocked", session_id, {
"threat_type": input_result.threat_type,
}, threat_detected=True)
return "Request blocked for security reasons."
# Run your agent
response = await run_agent(user_input)
# Scan output — exfiltration, PII, policy violations
output_result = shield.scan(response, direction="outbound")
if output_result.blocked:
log_agent_event("output_blocked", session_id, {
"threat_type": output_result.threat_type,
}, threat_detected=True)
return "I'm unable to provide that information."
log_agent_event("request_complete", session_id, {
"input_scanned": True,
"output_scanned": True,
})
return responseThree lines to add runtime scanning. The rune SDK handles the three-layer detection pipeline — pattern matching, vector similarity, and LLM judge — so you don't have to build and maintain each layer yourself. In-process architecture means your data never leaves your infrastructure. L1 scanning adds less than 5ms to your critical path.
The manual implementations in this guide give you full control over every layer. Rune gives you the same coverage with significantly less code to maintain, plus a dashboard, alerting, audit trails, and 52 built-in detection patterns that are updated as new attack techniques emerge.
Frequently asked questions
How do I add security to an existing Python AI agent without refactoring?
Install the Rune SDK (pip install rune-toolkit), import Shield, and wrap your agent function with the @shield.protect() decorator. This adds input scanning, output scanning, and tool call interception with zero changes to your agent logic. L1 scanning adds less than 5ms to your critical path.
What percentage of AI agent sessions face prompt injection attacks?
Rune’s production data shows a 14.2% injection rate across agent sessions — roughly 1 in 7 sessions contains an attack attempt in live production traffic.
What is the most under-protected surface in AI agent security?
Tool calls. Agents call databases, APIs, file systems, and email services, but most security platforms don’t scan tool calls at the parameter level. This is where most agent compromises happen.
Does Rune work with LangChain, OpenAI, Anthropic, and MCP?
Yes. Rune’s Python SDK works with any Python agent framework including LangChain, OpenAI, Anthropic, CrewAI, and MCP, with both a universal decorator and framework-specific integrations.
Continue reading
Your agents are already in production
pip install rune-toolkit, add 3 lines, and start scanning — 10,000 events per month, no credit card, no sales call.