PDF Tamper Detection in Python: Integrate in Under 50 Lines

This article is a snapshot — content was accurate as of May 2026 (code examples tested against the API as of April 2026). The product evolves actively; specific counts, examples, and detection rules may have changed since publication — see the changelog for the current state.
Your application accepts PDFs from users — loan applications, identity documents, invoices, contracts. Any of them may have been edited after the issuing institution generated them. You have no original to compare against, and manual visual inspection does not scale.
This tutorial shows you how to integrate the HTPBE API into a Python application to detect post-creation modifications forensically. By the end you will have production-ready code for single-document checks, verdict routing, batch processing, and async poll loops.
How PDF tamper detection works
Submit a PDF URL via POST /v1/analyze. Get back a check ID. Retrieve the verdict with GET /v1/result/{id}. The API analyzes the file’s internal structure — metadata timestamps, cross-reference tables, digital signatures, producer/creator fields — and returns one of three verdicts: intact, modified, or inconclusive.
See the how it works page for a deeper explanation of the forensic layers. No file upload required, no SDK to install. Standard HTTP.
Prerequisites
pip install requestsGet your API key at htpbe.tech/auth/signup. Every plan includes a free test key you can use immediately without consuming live quota.
Set it as an environment variable:
export HTPBE_API_KEY="your_api_key_here"Step 1 — Basic integration
The minimal integration: submit a URL, retrieve the result, print the verdict.
import os
import time
import requests
API_KEY = os.environ["HTPBE_API_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
def check_pdf(pdf_url: str) -> dict:
"""Submit a PDF URL for forensic analysis and return the result."""
# Step 1: submit the URL
submit = requests.post(
f"{BASE_URL}/analyze",
headers=HEADERS,
json={"url": pdf_url},
timeout=30,
)
submit.raise_for_status()
check_id = submit.json()["id"]
# Step 2: retrieve the verdict
result = requests.get(
f"{BASE_URL}/result/{check_id}",
headers=HEADERS,
timeout=30,
)
result.raise_for_status()
return result.json()
if __name__ == "__main__":
report = check_pdf("https://your-storage.example.com/documents/statement.pdf")
print(report["status"]) # intact | modified | inconclusiveThe API is synchronous for most documents. Submit and retrieve in two sequential requests. Analysis typically completes in under two seconds.
Step 2 — Handle all three verdicts
A production integration must act differently on each verdict. The routing function below also reads the named markers, producer, and xref_count from the response.
def route_document(pdf_url: str, document_type: str = "generic") -> dict:
"""
Analyze a PDF and return a routing decision.
Returns a dict with keys:
- action: "accept" | "reject" | "review"
- check_id: str (store this for audit purposes)
- reason: str
"""
report = check_pdf(pdf_url)
status = report["status"]
check_id = report["id"]
markers = report.get("modification_markers", [])
producer = report.get("producer") or "unknown"
xref_count = report.get("xref_count", 1)
if status == "intact":
return {
"action": "accept",
"check_id": check_id,
"reason": "Document structure is consistent with original issuance.",
}
if status == "modified":
reason_parts = [f"Forensic markers: {', '.join(markers)}."]
if xref_count > 1:
reason_parts.append(f"File was saved {xref_count} times after creation.")
if producer:
reason_parts.append(f"Last processed by: {producer}.")
return {
"action": "reject",
"check_id": check_id,
"reason": " ".join(reason_parts),
}
# status == "inconclusive"
# The document was created with consumer software and lacks institutional metadata.
# For documents that should come from banks, tax authorities, or payroll systems,
# inconclusive is a strong signal of fraud — treat it accordingly.
institutional_types = {"bank_statement", "tax_document", "payslip", "insurance_policy"}
if document_type in institutional_types:
return {
"action": "reject",
"check_id": check_id,
"reason": (
f"Document origin ({producer}) is inconsistent with a {document_type}. "
"Expected institutional software; consumer software detected."
),
}
return {
"action": "review",
"check_id": check_id,
"reason": "Document created with consumer software. Route to manual review.",
}What inconclusive means
inconclusive does not mean the check failed. It means the document was created with consumer software — Microsoft Word, Google Docs, LibreOffice, Canva — and does not carry the structural fingerprint of an institutional document system.
For a user-uploaded CV or a self-drafted letter, this result is normal. For a document that claims to be a bank statement, payslip, or tax certificate, inconclusive is a strong fraud signal: real bank systems do not generate PDFs with Google Docs.
The modification_markers array
When status is modified, the modification_markers array names the specific signals that triggered the verdict:
| Marker | What it means |
|---|---|
INCREMENTAL_UPDATES | The file has more than one xref section — edited and re-saved after original creation |
PRODUCER_MISMATCH | Creator and Producer fields name different tools (bank system vs. consumer editor) |
DIFFERENT_DATES | Modification date is inconsistent with the claimed creation date |
MODIFICATIONS_AFTER_SIGNATURE | Content was appended after a valid digital signature |
SIGNATURE_REMOVED | A digital signature slot exists but the signature has been stripped |
MODIFICATIONS_AFTER_SIGNATURE and SIGNATURE_REMOVED carry "certain" confidence, meaning the verdict is cryptographically verifiable. All other markers produce "high" confidence. For workflows where false positives are costly, you may want to auto-reject only "certain" markers and route "high" markers to manual review.
Step 3 — Batch processing
Process a list of PDF URLs, collect results, and filter the modified cases.
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_check(
pdf_urls: list[str],
document_type: str = "generic",
max_workers: int = 5,
) -> list[dict]:
"""
Run forensic checks on multiple PDFs in parallel.
Returns a list of result dicts sorted by the original URL order.
Each dict includes the URL, routing decision, and raw report fields.
"""
results: dict[str, dict] = {}
def check_one(url: str) -> tuple[str, dict]:
try:
decision = route_document(url, document_type)
report = check_pdf(url)
return url, {
"url": url,
"action": decision["action"],
"check_id": decision["check_id"],
"reason": decision["reason"],
"status": report["status"],
"markers": report.get("modification_markers", []),
"producer": report.get("producer"),
"xref_count": report.get("xref_count"),
"error": None,
}
except requests.HTTPError as exc:
return url, {
"url": url,
"action": "error",
"check_id": None,
"reason": f"HTTP {exc.response.status_code}",
"status": None,
"markers": [],
"producer": None,
"xref_count": None,
"error": str(exc),
}
except Exception as exc:
return url, {
"url": url,
"action": "error",
"check_id": None,
"reason": str(exc),
"status": None,
"markers": [],
"producer": None,
"xref_count": None,
"error": str(exc),
}
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(check_one, url): url for url in pdf_urls}
for future in as_completed(futures):
url, result = future.result()
results[url] = result
# preserve original order
ordered = [results[url] for url in pdf_urls]
return ordered
def filter_flagged(results: list[dict]) -> list[dict]:
"""Return only rejected documents for downstream handling."""
return [r for r in results if r["action"] == "reject"]
# Example usage
if __name__ == "__main__":
urls = [
"https://your-storage.example.com/docs/statement-jan.pdf",
"https://your-storage.example.com/docs/statement-feb.pdf",
"https://your-storage.example.com/docs/statement-mar.pdf",
]
all_results = batch_check(urls, document_type="bank_statement")
flagged = filter_flagged(all_results)
print(f"Checked: {len(all_results)}, Flagged: {len(flagged)}")
for doc in flagged:
print(f" REJECT {doc['url']}")
print(f" Markers: {doc['markers']}")
print(f" Producer: {doc['producer']}")Keep max_workers at 5 or below. The API enforces rate limits per plan — parallel requests that exceed your plan’s concurrency limit will receive 429 responses.
Step 4 — Async poll pattern (advanced)
For document intake pipelines where you submit files at one time and process results later — for example, when a user uploads a document during onboarding and your system checks it in the background — the submit-then-poll pattern decouples ingestion from analysis.
import os
import time
import requests
from dataclasses import dataclass
API_KEY = os.environ["HTPBE_API_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
@dataclass
class PendingCheck:
check_id: str
pdf_url: str
submitted_at: float
def submit_for_analysis(pdf_url: str) -> PendingCheck:
"""Submit a PDF URL and return a handle for later retrieval."""
response = requests.post(
f"{BASE_URL}/analyze",
headers=HEADERS,
json={"url": pdf_url},
timeout=30,
)
response.raise_for_status()
check_id = response.json()["id"]
return PendingCheck(
check_id=check_id,
pdf_url=pdf_url,
submitted_at=time.time(),
)
def poll_result(
pending: PendingCheck,
poll_interval: float = 1.0,
timeout: float = 30.0,
) -> dict:
"""
Poll for a result until it is ready or the timeout expires.
In practice the API returns synchronously, so the first poll
almost always succeeds. This loop handles edge cases and network
retries for long-running queues.
"""
deadline = time.time() + timeout
while time.time() < deadline:
response = requests.get(
f"{BASE_URL}/result/{pending.check_id}",
headers=HEADERS,
timeout=30,
)
if response.status_code == 200:
return response.json()
if response.status_code == 404:
# Not ready yet — wait and retry
time.sleep(poll_interval)
continue
response.raise_for_status()
raise TimeoutError(
f"Result for {pending.check_id} not ready after {timeout}s"
)
# Pipeline pattern: submit a batch, then retrieve when convenient
def intake_pipeline(urls: list[str]) -> list[dict]:
# Phase 1: submit all documents
pending_checks = []
for url in urls:
try:
pending = submit_for_analysis(url)
pending_checks.append(pending)
print(f"Submitted {url} → {pending.check_id}")
except requests.HTTPError as exc:
print(f"Submit failed for {url}: {exc}")
# Phase 2: retrieve results (could be in a separate job/worker)
results = []
for pending in pending_checks:
try:
result = poll_result(pending)
results.append(result)
print(f"{pending.check_id}: {result['status']}")
except (TimeoutError, requests.HTTPError) as exc:
print(f"Retrieval failed for {pending.check_id}: {exc}")
return resultsStore PendingCheck objects in your database (or a Redis queue) between the submit and retrieve phases. The check_id is stable — you can retrieve results days after submission using GET /v1/result/{check_id}.
Full API response reference
A typical response from GET /v1/result/{id}:
{
"id": "ck_9f4a2e1b-3d7c-4a8e-b1f2-9e0d3c5a7b8f",
"status": "modified",
"modification_confidence": "high",
"modification_markers": ["PRODUCER_MISMATCH", "INCREMENTAL_UPDATES"],
"xref_count": 3,
"has_digital_signature": false,
"modifications_after_signature": false,
"signature_removed": false,
"creator": "HSBC Document Service",
"producer": "Smallpdf",
"creation_date": 1764547200,
"modification_date": 1742688000
}| Field | Type | Notes |
|---|---|---|
status | "intact" / "modified" / "inconclusive" | Primary verdict |
modification_confidence | "certain" / "high" / "none" | Confidence in the verdict |
modification_markers | string[] | Named signals that triggered modified |
xref_count | number | Number of cross-reference sections (edit sessions) |
has_digital_signature | boolean | Document carries a digital signature |
modifications_after_signature | boolean | Content added after the signature byte range |
signature_removed | boolean | Signature slot present but signature stripped |
creator | string | null | Software that originally created the document |
producer | string | null | Software that last saved the document |
creation_date | number | null | Unix timestamp of declared creation date |
modification_date | number | null | Unix timestamp of declared modification date |
Testing with test keys
Every HTPBE plan includes a test API key (prefixed htpbe_test_). Test keys accept mock URLs and return deterministic responses — no live PDF required, no quota consumed.
import os
import requests
TEST_KEY = os.environ["HTPBE_TEST_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {"Authorization": f"Bearer {TEST_KEY}", "Content-Type": "application/json"}
# Predictable test fixtures
TEST_URLS = {
"intact": "https://api.htpbe.tech/v1/test/clean.pdf",
"modified_high": "https://api.htpbe.tech/v1/test/modified-high.pdf",
"inconclusive": "https://api.htpbe.tech/v1/test/inconclusive.pdf",
"sig_removed": "https://api.htpbe.tech/v1/test/signature-removed.pdf",
"modified_after_sig":"https://api.htpbe.tech/v1/test/modified-medium.pdf",
}
def test_all_verdict_branches():
for scenario, url in TEST_URLS.items():
submit = requests.post(f"{BASE_URL}/analyze", headers=HEADERS, json={"url": url}, timeout=30)
submit.raise_for_status()
check_id = submit.json()["id"]
result = requests.get(f"{BASE_URL}/result/{check_id}", headers=HEADERS, timeout=30)
result.raise_for_status()
data = result.json()
print(f"{scenario:20s} → status={data['status']}, confidence={data['modification_confidence']}")
if __name__ == "__main__":
test_all_verdict_branches()Use your test key in .env.test and your live key in .env — keep them in separate files so test traffic never reaches production routing.
What this approach does not detect
Forensic metadata analysis catches the common case: someone downloaded a legitimate PDF and edited it with a standard tool. It does not catch:
Documents fabricated from scratch using institutional tools. If an attacker generates a bank statement using the same software a real bank uses, sets plausible timestamps, and produces a structurally clean PDF, the document may pass analysis. This requires access to institutional software and deliberate counter-forensic effort — uncommon in fraud at scale.
Strongly encrypted PDFs. Encryption prevents reading the structural content. These return inconclusive. For document types that should never be encrypted (bank statements, payslips), treat inconclusive from an encrypted file the same as modified.
For the fraud patterns that account for the vast majority of real document manipulation — editing existing PDFs with consumer tools like iLovePDF, Smallpdf, or Adobe Reader — the forensic approach detects them reliably.
Who should integrate PDF modification detection
If your application accepts PDFs from untrusted parties — loan applicants, tenants, job candidates, insurance claimants — and you need a programmatic signal before human review, this integration adds a forensic layer in under 50 lines of Python.
The HTPBE API is available on self-serve plans from $15/month. Get your API key — free test keys are included on every plan, so you can build and test your integration before your first live check.