Here’s a stat that should concern every security team: 73% of developers say they regularly ignore findings from security scanning tools. Not because they don’t care about security — because the tools are unusable.
The typical security scanning experience: a 30-minute CI scan produces a 500-line report full of false positives, delivered in a separate portal nobody visits, with zero context on how to fix anything. Developers learn to ignore it. Security teams wonder why nothing gets fixed. Everyone blames the other side.
The problem isn’t detection accuracy — it’s developer experience. This article shows you how to build a vulnerability detection system that developers actually want to use.
The Architecture That Works
A system developers adopt has five layers: integration points where developers already work, a unified API, a multi-scanner engine, a results processor that eliminates noise, and a feedback loop that gets smarter over time.
The critical insight: the scanning engine is the easy part. The hard part — and where most teams fail — is everything around it: the integrations, the noise reduction, and the developer experience.
Why Security Tools Fail (And What to Do Instead)
Before building anything, understand why developers ignore existing tools:
The adoption formula is simple:
Adoption = (Low Noise + Fast Scans + Auto-Fix + In-Workflow) / FrictionEvery design decision should optimize for this equation. Let’s build it.
Part 1: The Scanning Engine
The engine runs multiple scanners in parallel and normalizes their output into a unified finding format.
Unified Finding Schema
Every scanner produces different output. Normalize everything into one schema:
interface SecurityFinding {
id: string; // Deterministic hash for dedup
scanner: 'sast' | 'sca' | 'secret' | 'iac' | 'ai';
severity: 'critical' | 'high' | 'medium' | 'low' | 'info';
confidence: number; // 0-100, key for noise reduction
title: string;
description: string;
cwe?: string; // CWE-89 (SQL Injection), etc.
owasp?: string; // A03:2021 Injection
file: string;
startLine: number;
endLine: number;
snippet: string; // Code context around the finding
fix?: {
description: string;
diff: string; // Patch that fixes the issue
autoApplicable: boolean; // Can be applied without review?
};
metadata: {
rule: string; // Rule ID that triggered
scanner_version: string;
scan_duration_ms: number;
first_seen?: string; // For tracking regression vs new
commit_sha: string;
};
}The Scan Orchestrator
This is the core service that coordinates all scanners:
import asyncio
import hashlib
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ScanType(Enum):
SAST = "sast"
SCA = "sca"
SECRET = "secret"
IAC = "iac"
AI_REVIEW = "ai_review"
@dataclass
class ScanRequest:
repo: str
branch: str
commit_sha: str
changed_files: list[str] # Only scan what changed (speed!)
scan_types: list[ScanType] = field(default_factory=lambda: list(ScanType))
base_branch: str = "main" # For diff-aware scanning
@dataclass
class ScanResult:
findings: list[dict]
scanner: str
duration_ms: int
files_scanned: int
class ScanOrchestrator:
"""Orchestrate multiple security scanners in parallel."""
def __init__(self, config: dict):
self.scanners = {
ScanType.SAST: SASTScanner(config.get("sast", {})),
ScanType.SCA: SCAScanner(config.get("sca", {})),
ScanType.SECRET: SecretScanner(config.get("secret", {})),
ScanType.IAC: IaCScanner(config.get("iac", {})),
ScanType.AI_REVIEW: AIReviewScanner(config.get("ai", {})),
}
self.dedup_cache = {}
self.suppression_rules = SuppressionStore()
async def scan(self, request: ScanRequest) -> list[dict]:
"""Run all requested scanners in parallel, merge and deduplicate."""
start = time.monotonic()
# Run scanners in parallel
tasks = []
for scan_type in request.scan_types:
scanner = self.scanners[scan_type]
tasks.append(
self._run_scanner(scanner, request)
)
results: list[ScanResult] = await asyncio.gather(*tasks)
# Merge all findings
all_findings = []
for result in results:
all_findings.extend(result.findings)
# Post-processing pipeline
findings = self._deduplicate(all_findings)
findings = self._apply_suppressions(findings)
findings = self._enrich_with_context(findings, request)
findings = self._prioritize(findings)
total_ms = int((time.monotonic() - start) * 1000)
print(f"Scan completed: {len(findings)} findings in {total_ms}ms")
return findings
async def _run_scanner(
self, scanner, request: ScanRequest
) -> ScanResult:
"""Run a single scanner with timeout."""
start = time.monotonic()
try:
findings = await asyncio.wait_for(
scanner.scan(
repo=request.repo,
files=request.changed_files,
commit=request.commit_sha,
),
timeout=300, # 5 min max per scanner
)
except asyncio.TimeoutError:
print(f"Scanner {scanner.name} timed out")
findings = []
duration = int((time.monotonic() - start) * 1000)
return ScanResult(
findings=findings,
scanner=scanner.name,
duration_ms=duration,
files_scanned=len(request.changed_files),
)
def _deduplicate(self, findings: list[dict]) -> list[dict]:
"""Remove duplicate findings across scanners."""
seen = set()
unique = []
for f in findings:
# Deterministic ID based on location + rule
dedup_key = hashlib.sha256(
f"{f['file']}:{f['startLine']}:{f['rule']}".encode()
).hexdigest()[:16]
if dedup_key not in seen:
seen.add(dedup_key)
f['id'] = dedup_key
unique.append(f)
return unique
def _apply_suppressions(self, findings: list[dict]) -> list[dict]:
"""Filter out suppressed findings (false positives marked by devs)."""
return [
f for f in findings
if not self.suppression_rules.is_suppressed(
rule=f['rule'],
file_pattern=f['file'],
reason=None,
)
]
def _prioritize(self, findings: list[dict]) -> list[dict]:
"""Sort by severity and confidence score."""
severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3, 'info': 4}
return sorted(
findings,
key=lambda f: (severity_order[f['severity']], -f.get('confidence', 50))
)
def _enrich_with_context(
self, findings: list[dict], request: ScanRequest
) -> list[dict]:
"""Add context: is this new or existing? Who last modified this code?"""
for f in findings:
f['is_new'] = f['metadata']['commit_sha'] == request.commit_sha
f['introduced_in'] = request.branch
return findingsSAST Scanner: Custom Semgrep Rules
Semgrep is the backbone for SAST. Write custom rules for your codebase patterns:
# rules/custom-sast-rules.yml
rules:
# Detect unsafe database queries
- id: sql-string-concat
severity: ERROR
message: |
SQL query built with string formatting. Use parameterized queries.
Fix: Replace with `db.query('SELECT ... WHERE id = $1', [userId])`
languages: [python, javascript, typescript]
patterns:
- pattern-either:
- pattern: $DB.query(f"...", ...)
- pattern: $DB.query("..." + $VAR, ...)
- pattern: $DB.execute(f"...", ...)
- pattern: cursor.execute(f"...", ...)
metadata:
cwe: "CWE-89"
owasp: "A03:2021"
confidence: 95
auto_fix: true
# Detect hardcoded secrets
- id: hardcoded-api-key
severity: WARNING
message: |
Possible hardcoded API key. Use environment variables or a secrets manager.
languages: [python, javascript, typescript, java, go]
pattern-regex: '(?i)(api[_-]?key|secret[_-]?key|access[_-]?token)\s*[:=]\s*["\x27][A-Za-z0-9+/=_\-]{20,}["\x27]'
metadata:
cwe: "CWE-798"
confidence: 80
# Detect missing auth checks in Express routes
- id: express-route-no-auth
severity: WARNING
message: |
Route handler without authentication middleware. Add `requireAuth` middleware.
languages: [javascript, typescript]
patterns:
- pattern: |
app.$METHOD($PATH, async (req, res) => { ... })
- pattern-not: |
app.$METHOD($PATH, requireAuth, async (req, res) => { ... })
- pattern-not: |
app.$METHOD($PATH, authenticate, async (req, res) => { ... })
- metavariable-regex:
metavariable: $METHOD
regex: (post|put|patch|delete)
metadata:
cwe: "CWE-306"
confidence: 70
# Detect eval with user input
- id: eval-user-input
severity: ERROR
message: |
eval() with potentially user-controlled input. This enables remote code execution.
languages: [python, javascript]
patterns:
- pattern-either:
- pattern: eval($REQ. ...)
- pattern: eval(request. ...)
- pattern: |
$X = request. ...
...
eval($X)
metadata:
cwe: "CWE-94"
confidence: 90SCA Scanner: Dependency Vulnerability Check
import json
import subprocess
from pathlib import Path
class SCAScanner:
"""Software Composition Analysis — find vulnerable dependencies."""
name = "sca"
async def scan(self, repo: str, files: list[str], commit: str) -> list[dict]:
findings = []
# Check if lockfiles changed (only full scan if deps changed)
lockfiles = [f for f in files if f in (
'package-lock.json', 'yarn.lock', 'pnpm-lock.yaml',
'requirements.txt', 'Pipfile.lock', 'poetry.lock',
'go.sum', 'Cargo.lock', 'Gemfile.lock',
)]
if not lockfiles:
return [] # No dependency changes, skip SCA
# Run Trivy for dependency scanning
result = subprocess.run(
[
"trivy", "fs",
"--scanners", "vuln",
"--severity", "CRITICAL,HIGH,MEDIUM",
"--format", "json",
"--quiet",
repo,
],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0 and not result.stdout:
return []
trivy_output = json.loads(result.stdout)
for target in trivy_output.get("Results", []):
for vuln in target.get("Vulnerabilities", []):
findings.append({
"scanner": "sca",
"severity": vuln["Severity"].lower(),
"confidence": 95, # CVE matches are high confidence
"title": f"{vuln['VulnerabilityID']}: {vuln['PkgName']}",
"description": vuln.get("Description", ""),
"cwe": vuln.get("CweIDs", [None])[0],
"file": target["Target"],
"startLine": 0,
"endLine": 0,
"snippet": f"{vuln['PkgName']} {vuln['InstalledVersion']}",
"fix": self._build_fix(vuln),
"rule": vuln["VulnerabilityID"],
"metadata": {
"rule": vuln["VulnerabilityID"],
"installed_version": vuln["InstalledVersion"],
"fixed_version": vuln.get("FixedVersion", "N/A"),
"cvss_score": vuln.get("CVSS", {}).get("nvd", {}).get("V3Score"),
"commit_sha": commit,
"scanner_version": "trivy",
"scan_duration_ms": 0,
},
})
return findings
def _build_fix(self, vuln: dict) -> dict | None:
fixed = vuln.get("FixedVersion")
if not fixed:
return None
return {
"description": f"Upgrade {vuln['PkgName']} to {fixed}",
"diff": f"-{vuln['PkgName']}@{vuln['InstalledVersion']}\n+{vuln['PkgName']}@{fixed}",
"autoApplicable": True,
}Secret Scanner
import re
import math
from collections import Counter
class SecretScanner:
"""Detect secrets, API keys, and credentials in source code."""
name = "secret"
# High-confidence patterns (verified format)
SECRET_PATTERNS = [
("AWS Access Key", r'AKIA[0-9A-Z]{16}', 99),
("AWS Secret Key", r'(?i)aws_secret_access_key\s*=\s*["\x27]([A-Za-z0-9/+=]{40})["\x27]', 95),
("GitHub Token", r'ghp_[A-Za-z0-9]{36}', 99),
("GitHub OAuth", r'gho_[A-Za-z0-9]{36}', 99),
("Slack Token", r'xox[bpors]-[A-Za-z0-9\-]+', 95),
("Stripe Key", r'sk_live_[A-Za-z0-9]{24,}', 99),
("Private Key", r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----', 99),
("Generic Secret", r'(?i)(password|secret|token|api_key)\s*[:=]\s*["\x27]([A-Za-z0-9+/=_\-]{16,})["\x27]', 70),
]
async def scan(self, repo: str, files: list[str], commit: str) -> list[dict]:
findings = []
# Skip binary files and lockfiles
scannable = [
f for f in files
if not f.endswith(('.lock', '.png', '.jpg', '.woff', '.min.js'))
]
for filepath in scannable:
try:
full_path = f"{repo}/{filepath}"
with open(full_path, 'r', errors='ignore') as f:
lines = f.readlines()
except (FileNotFoundError, IsADirectoryError):
continue
for line_num, line in enumerate(lines, 1):
# Skip comments and test files
stripped = line.strip()
if stripped.startswith(('#', '//', '*', '<!--')):
continue
for name, pattern, confidence in self.SECRET_PATTERNS:
match = re.search(pattern, line)
if match:
# Entropy check for generic patterns
if confidence < 80:
secret_val = match.group(2) if match.lastindex and match.lastindex >= 2 else match.group()
entropy = self._shannon_entropy(secret_val)
if entropy < 3.5:
continue # Low entropy = likely not a real secret
findings.append({
"scanner": "secret",
"severity": "critical" if confidence >= 90 else "high",
"confidence": confidence,
"title": f"Potential {name} detected",
"description": f"Found what appears to be a {name} in source code. Secrets should be stored in environment variables or a vault.",
"cwe": "CWE-798",
"file": filepath,
"startLine": line_num,
"endLine": line_num,
"snippet": self._redact_secret(line.strip()),
"fix": {
"description": f"Move this {name} to an environment variable or secrets manager",
"diff": "",
"autoApplicable": False,
},
"rule": f"secret-{name.lower().replace(' ', '-')}",
"metadata": {
"rule": f"secret-{name.lower().replace(' ', '-')}",
"commit_sha": commit,
"scanner_version": "secret-scanner-v1",
"scan_duration_ms": 0,
},
})
return findings
def _shannon_entropy(self, data: str) -> float:
"""Calculate Shannon entropy to distinguish real secrets from placeholders."""
if not data:
return 0.0
counter = Counter(data)
length = len(data)
return -sum(
(count / length) * math.log2(count / length)
for count in counter.values()
)
def _redact_secret(self, line: str) -> str:
"""Show first 4 and last 4 chars, redact the middle."""
# Redact anything that looks like a secret value
def redact_match(m):
val = m.group()
if len(val) > 10:
return val[:4] + '*' * (len(val) - 8) + val[-4:]
return '****'
return re.sub(r'[A-Za-z0-9+/=_\-]{16,}', redact_match, line)Part 2: The Developer Experience Layer
This is where adoption lives or dies. Findings must appear where developers already work.
GitHub PR Integration
The most impactful integration — 90% of findings shown in PRs get fixed:
import httpx
class GitHubPRReporter:
"""Post scan findings as inline PR review comments."""
def __init__(self, github_token: str):
self.token = github_token
self.client = httpx.AsyncClient(
base_url="https://api.github.com",
headers={
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github.v3+json",
},
)
async def report_findings(
self,
owner: str,
repo: str,
pr_number: int,
commit_sha: str,
findings: list[dict],
):
"""Create a PR review with inline comments for each finding."""
if not findings:
# Leave a positive comment if no issues found
await self._post_comment(
owner, repo, pr_number,
"## Security Scan: All Clear\nNo security issues found in this PR."
)
return
# Group findings by severity for summary
summary = self._build_summary(findings)
# Build review comments (inline on the code)
comments = []
for finding in findings:
body = self._format_finding(finding)
comments.append({
"path": finding["file"],
"line": finding["startLine"],
"side": "RIGHT",
"body": body,
})
# Create the review
review_body = {
"commit_id": commit_sha,
"body": summary,
"event": self._review_event(findings),
"comments": comments[:50], # GitHub limits to 50 inline comments
}
response = await self.client.post(
f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews",
json=review_body,
)
response.raise_for_status()
def _format_finding(self, finding: dict) -> str:
"""Format a single finding as a PR comment."""
severity_emoji = {
'critical': '🔴', 'high': '🟠',
'medium': '🟡', 'low': '🔵', 'info': '⚪'
}
emoji = severity_emoji.get(finding['severity'], '⚪')
body = f"### {emoji} {finding['title']}\n\n"
body += f"**Severity:** {finding['severity'].upper()} "
body += f"(confidence: {finding.get('confidence', 'N/A')}%)\n\n"
body += f"{finding['description']}\n\n"
if finding.get('cwe'):
body += f"**CWE:** [{finding['cwe']}](https://cwe.mitre.org/data/definitions/{finding['cwe'].split('-')[1]}.html)\n\n"
if finding.get('fix'):
body += "**Suggested Fix:**\n"
body += f"{finding['fix']['description']}\n\n"
if finding['fix'].get('diff'):
body += f"```diff\n{finding['fix']['diff']}\n```\n\n"
if finding['fix'].get('autoApplicable'):
body += "> **This fix can be auto-applied.** "
body += "Comment `/fix` to apply.\n\n"
body += "<details><summary>Dismiss this finding</summary>\n\n"
body += f"If this is a false positive, comment `/suppress {finding['id']}` "
body += "with a reason.\n\n</details>"
return body
def _build_summary(self, findings: list[dict]) -> str:
"""Build the review summary."""
by_severity = {}
for f in findings:
by_severity.setdefault(f['severity'], []).append(f)
summary = "## Security Scan Results\n\n"
summary += f"Found **{len(findings)}** issue(s) in this PR:\n\n"
summary += "| Severity | Count |\n|----------|-------|\n"
for sev in ['critical', 'high', 'medium', 'low']:
count = len(by_severity.get(sev, []))
if count:
summary += f"| {sev.upper()} | {count} |\n"
auto_fixable = sum(1 for f in findings if f.get('fix', {}).get('autoApplicable'))
if auto_fixable:
summary += f"\n**{auto_fixable}** finding(s) have auto-fix available. "
summary += "Comment `/fix-all` to apply all safe fixes.\n"
return summary
def _review_event(self, findings: list[dict]) -> str:
"""Determine review action based on severity."""
has_critical = any(f['severity'] == 'critical' for f in findings)
has_high = any(f['severity'] == 'high' for f in findings)
if has_critical or has_high:
return "REQUEST_CHANGES" # Block the PR
return "COMMENT" # Non-blockingIDE Plugin (VS Code Extension)
Real-time scanning as developers type — catches issues before they even commit:
// vscode-extension/src/scanner.ts
import * as vscode from 'vscode';
interface InlineFinding {
file: string;
line: number;
severity: string;
message: string;
fix?: string;
}
export class SecurityScanner {
private diagnostics: vscode.DiagnosticCollection;
private debounceTimer: NodeJS.Timeout | null = null;
constructor(context: vscode.ExtensionContext) {
this.diagnostics = vscode.languages.createDiagnosticCollection('security');
context.subscriptions.push(this.diagnostics);
// Scan on save (fast, incremental)
vscode.workspace.onDidSaveTextDocument((doc) => {
this.scanFile(doc);
});
// Scan on type with debounce (lighter checks only)
vscode.workspace.onDidChangeTextDocument((event) => {
if (this.debounceTimer) clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(() => {
this.scanFile(event.document);
}, 1000); // Wait 1s after typing stops
});
}
private async scanFile(document: vscode.TextDocument) {
const filePath = document.uri.fsPath;
const supportedLangs = [
'javascript', 'typescript', 'python', 'java', 'go'
];
if (!supportedLangs.includes(document.languageId)) return;
try {
// Call the scan API (local daemon or remote service)
const response = await fetch('http://localhost:7890/scan', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
file: filePath,
content: document.getText(),
language: document.languageId,
}),
});
const findings: InlineFinding[] = await response.json();
this.showDiagnostics(document.uri, findings);
} catch {
// Silently fail — don't interrupt developer workflow
}
}
private showDiagnostics(
uri: vscode.Uri,
findings: InlineFinding[]
) {
const diags: vscode.Diagnostic[] = findings.map((f) => {
const range = new vscode.Range(
f.line - 1, 0, f.line - 1, Number.MAX_VALUE
);
const severity = f.severity === 'critical' || f.severity === 'high'
? vscode.DiagnosticSeverity.Error
: f.severity === 'medium'
? vscode.DiagnosticSeverity.Warning
: vscode.DiagnosticSeverity.Information;
const diag = new vscode.Diagnostic(range, f.message, severity);
diag.source = 'Security Scanner';
diag.code = f.fix ? { value: 'auto-fix', target: uri } : undefined;
return diag;
});
this.diagnostics.set(uri, diags);
}
}Pre-Commit Hook
The last gate before code reaches the repo:
# .pre-commit-config.yaml
repos:
# Secret scanning (fastest, run first)
- repo: https://github.com/gitleaks/gitleaks
rev: v8.18.0
hooks:
- id: gitleaks
name: Detect secrets
args: ['--verbose']
# SAST with Semgrep
- repo: https://github.com/semgrep/semgrep
rev: v1.60.0
hooks:
- id: semgrep
name: Security scan (SAST)
args:
- --config=p/owasp-top-ten
- --config=.semgrep/custom-rules.yml
- --error # Fail on findings
- --severity=ERROR # Only block on high/critical
- --quiet
# Dependency check (only on lockfile changes)
- repo: local
hooks:
- id: dependency-check
name: Check vulnerable dependencies
entry: bash -c 'trivy fs --scanners vuln --severity CRITICAL,HIGH --exit-code 1 --quiet .'
language: system
files: '(package-lock\.json|yarn\.lock|requirements\.txt|go\.sum)$'
pass_filenames: falsePart 3: The Feedback Loop
This is what separates a static tool from an improving system. Developers tell you what’s wrong, and the system gets smarter.
Suppression System
When a developer marks a finding as false positive, capture the context and use it to tune rules:
from datetime import datetime
from enum import Enum
class DismissReason(Enum):
FALSE_POSITIVE = "false_positive"
ACCEPTED_RISK = "accepted_risk"
MITIGATED_ELSEWHERE = "mitigated_elsewhere"
TEST_CODE = "test_code"
WILL_FIX_LATER = "will_fix_later"
class SuppressionStore:
"""Track developer feedback on findings to reduce noise."""
def __init__(self, db):
self.db = db
async def suppress(
self,
finding_id: str,
rule: str,
file_pattern: str,
reason: DismissReason,
comment: str,
user: str,
):
"""Record a suppression from developer feedback."""
await self.db.execute(
"""INSERT INTO suppressions
(finding_id, rule, file_pattern, reason, comment, user_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
finding_id, rule, file_pattern, reason.value,
comment, user, datetime.utcnow(),
)
# Track suppression rate per rule for tuning
await self._update_rule_stats(rule, reason)
async def _update_rule_stats(self, rule: str, reason: DismissReason):
"""Track false positive rate per rule."""
if reason == DismissReason.FALSE_POSITIVE:
await self.db.execute(
"""UPDATE rule_stats
SET false_positive_count = false_positive_count + 1,
total_count = total_count + 1,
last_fp_at = $2
WHERE rule_id = $1""",
rule, datetime.utcnow(),
)
# Auto-disable rules with >30% false positive rate
stats = await self.db.fetchrow(
"SELECT * FROM rule_stats WHERE rule_id = $1", rule
)
if stats and stats['total_count'] >= 20:
fp_rate = stats['false_positive_count'] / stats['total_count']
if fp_rate > 0.30:
await self._disable_rule(rule, f"Auto-disabled: {fp_rate:.0%} FP rate")
async def get_rule_health(self) -> list[dict]:
"""Dashboard: show which rules are noisy."""
return await self.db.fetch("""
SELECT
rule_id,
total_count,
false_positive_count,
ROUND(false_positive_count::numeric / NULLIF(total_count, 0), 2) as fp_rate,
last_fp_at,
enabled
FROM rule_stats
ORDER BY fp_rate DESC
""")Auto-Fix Generation
The highest-impact feature for adoption — suggest a fix, or better, apply it:
class AutoFixGenerator:
"""Generate code fixes for common vulnerability patterns."""
FIX_TEMPLATES = {
"sql-string-concat": {
"python": {
"pattern": r'cursor\.execute\(f"([^"]*)\{(\w+)\}([^"]*)"',
"replacement": r'cursor.execute("\1%s\3", (\2,))',
"description": "Convert f-string SQL to parameterized query",
},
"javascript": {
"pattern": r'db\.query\(`([^`]*)\$\{(\w+)\}([^`]*)`\)',
"replacement": r'db.query("\1$1\3", [\2])',
"description": "Convert template literal SQL to parameterized query",
},
},
"hardcoded-api-key": {
"python": {
"pattern": r'(\w+)\s*=\s*["\x27]([A-Za-z0-9+/=_\-]{20,})["\x27]',
"replacement": r'\1 = os.environ["\1".upper()]',
"description": "Replace hardcoded secret with environment variable",
},
},
}
def generate_fix(
self, finding: dict, file_content: str, language: str
) -> dict | None:
"""Generate a fix diff for the given finding."""
rule = finding.get("rule", "")
template = self.FIX_TEMPLATES.get(rule, {}).get(language)
if not template:
return None
lines = file_content.split('\n')
line_idx = finding['startLine'] - 1
if line_idx >= len(lines):
return None
original_line = lines[line_idx]
fixed_line = re.sub(template['pattern'], template['replacement'], original_line)
if fixed_line == original_line:
return None
return {
"description": template['description'],
"diff": f"- {original_line.strip()}\n+ {fixed_line.strip()}",
"autoApplicable": True,
"patched_content": '\n'.join(
lines[:line_idx] + [fixed_line] + lines[line_idx + 1:]
),
}Part 4: The CI/CD Pipeline Integration
# .github/workflows/security-scan.yml
name: Security Scan
on:
pull_request:
types: [opened, synchronize]
jobs:
security-scan:
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: write # Required for PR comments
security-events: write # Required for SARIF upload
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Need full history for diff
- name: Get changed files
id: changed
run: |
echo "files=$(git diff --name-only origin/${{ github.base_ref }}...HEAD | tr '\n' ',')" >> $GITHUB_OUTPUT
- name: Run security scan
id: scan
run: |
# Call your scan orchestrator API
curl -s -X POST http://scanner.internal/scan \
-H "Content-Type: application/json" \
-d '{
"repo": "${{ github.repository }}",
"branch": "${{ github.head_ref }}",
"commit": "${{ github.sha }}",
"changed_files": "${{ steps.changed.outputs.files }}",
"pr_number": ${{ github.event.pull_request.number }}
}' > scan_results.json
# Check for blockers
CRITICAL=$(jq '[.findings[] | select(.severity=="critical")] | length' scan_results.json)
HIGH=$(jq '[.findings[] | select(.severity=="high")] | length' scan_results.json)
echo "critical=$CRITICAL" >> $GITHUB_OUTPUT
echo "high=$HIGH" >> $GITHUB_OUTPUT
# Upload SARIF for GitHub Security tab
- name: Upload SARIF
if: always()
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: scan_results.sarif
# Block PR on critical/high findings
- name: Security gate
if: steps.scan.outputs.critical > 0 || steps.scan.outputs.high > 0
run: |
echo "::error::Security scan found ${{ steps.scan.outputs.critical }} critical and ${{ steps.scan.outputs.high }} high severity issues"
exit 1Part 5: Metrics That Matter
Track these to prove the system works and justify continued investment:
-- Dashboard queries for security metrics
-- 1. Mean Time to Remediate (MTTR) by severity
SELECT
severity,
AVG(EXTRACT(EPOCH FROM (resolved_at - created_at)) / 3600) as avg_hours_to_fix,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY
EXTRACT(EPOCH FROM (resolved_at - created_at)) / 3600
) as median_hours_to_fix
FROM findings
WHERE resolved_at IS NOT NULL
AND created_at >= NOW() - INTERVAL '30 days'
GROUP BY severity;
-- 2. False positive rate by rule (which rules need tuning?)
SELECT
rule_id,
total_count,
false_positive_count,
ROUND(false_positive_count::numeric / NULLIF(total_count, 0) * 100, 1) as fp_percent,
enabled
FROM rule_stats
WHERE total_count >= 10
ORDER BY fp_percent DESC
LIMIT 20;
-- 3. Fix rate: what % of findings get fixed?
SELECT
DATE_TRUNC('week', created_at) as week,
COUNT(*) as total_findings,
COUNT(*) FILTER (WHERE status = 'fixed') as fixed,
COUNT(*) FILTER (WHERE status = 'suppressed') as suppressed,
COUNT(*) FILTER (WHERE status = 'open') as open,
ROUND(
COUNT(*) FILTER (WHERE status = 'fixed')::numeric / COUNT(*) * 100, 1
) as fix_rate_percent
FROM findings
WHERE created_at >= NOW() - INTERVAL '90 days'
GROUP BY week
ORDER BY week;
-- 4. Developer engagement: who's fixing, who's suppressing?
SELECT
resolved_by as developer,
COUNT(*) FILTER (WHERE status = 'fixed') as findings_fixed,
COUNT(*) FILTER (WHERE status = 'suppressed') as findings_suppressed,
AVG(EXTRACT(EPOCH FROM (resolved_at - created_at)) / 3600) as avg_hours_to_resolve
FROM findings
WHERE resolved_at IS NOT NULL
AND created_at >= NOW() - INTERVAL '30 days'
GROUP BY resolved_by
ORDER BY findings_fixed DESC;
-- 5. Scanner performance: speed + accuracy
SELECT
scanner,
AVG(scan_duration_ms) as avg_scan_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY scan_duration_ms) as p95_scan_ms,
COUNT(*) as total_scans,
SUM(findings_count) as total_findings,
SUM(false_positive_count) as total_fps,
ROUND(SUM(false_positive_count)::numeric / NULLIF(SUM(findings_count), 0) * 100, 1) as fp_rate
FROM scan_runs
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY scanner;SLA Targets
Security SLA Targets:
│
├── Scan Performance
│ ├── Incremental scan: < 2 minutes
│ ├── Full scan: < 10 minutes
│ └── IDE scan: < 5 seconds per file
│
├── Accuracy
│ ├── False positive rate: < 5%
│ ├── Critical/high detection rate: > 95%
│ └── Auto-fix availability: > 60% of findings
│
├── Remediation SLAs
│ ├── Critical: Fix within 24 hours
│ ├── High: Fix within 7 days
│ ├── Medium: Fix within 30 days
│ └── Low: Fix within 90 days or accept risk
│
└── Adoption
├── PR scan coverage: > 95% of PRs
├── Finding fix rate: > 80%
├── Developer satisfaction: > 7/10
└── Suppression rate: < 15% (indicates noise level)Rollout Strategy: Don’t Boil the Ocean
The biggest mistake is launching everything at once in blocking mode. Here’s the rollout plan:
Week 1-2: Shadow Mode
├── Enable scanning on all PRs
├── Post results to a Slack channel (not on PRs)
├── Measure false positive rate
├── Tune rules based on noise
└── Goal: < 20% false positive rate
Week 3-4: Advisory Mode
├── Post findings as non-blocking PR comments
├── Enable suppression workflow
├── Track which findings get fixed vs ignored
├── Continue tuning rules
└── Goal: < 10% false positive rate
Week 5-8: Blocking Mode (Critical Only)
├── Block PRs on critical findings only
├── Auto-fix available for common patterns
├── Weekly rule health review
├── Onboard team leads as security champions
└── Goal: < 5% false positive rate
Month 3+: Full Enforcement
├── Block on critical + high findings
├── IDE plugin available for early detection
├── Monthly security metrics report
├── Quarterly rule review with dev team
└── Goal: Steady state, continuous improvementConclusion
The secret to building a vulnerability detection system that developers actually use isn’t better detection — it’s better UX. Every decision should pass the adoption formula test:
Does this reduce friction? If not, rethink it.
The core principles:
- Scan fast — incremental, parallel, under 5 minutes
- Report where developers are — PR comments, IDE, Slack. Never a separate portal
- Be accurate — under 5% false positives, or developers will ignore you
- Suggest fixes — every finding should tell them how to fix it, not just what’s wrong
- Get smarter — the feedback loop turns developer dismissals into better rules
- Roll out gradually — shadow → advisory → blocking. Earn trust before enforcing
Build for developers first. Security follows.
Building a security scanning system at your org? Share this with your AppSec team — the best security tools are the ones developers don’t fight against.











