logo
PDF Security Blog

PDF Tamper Detection in Python: Integrate in Under 50 Lines

HTPBE Team··10 min read
PDF Tamper Detection in Python: Integrate in Under 50 Lines

This article is a snapshot — content was accurate as of May 2026 (code examples tested against the API as of April 2026). The product evolves actively; specific counts, examples, and detection rules may have changed since publication — see the changelog for the current state.

Your application accepts PDFs from users — loan applications, identity documents, invoices, contracts. Any of them may have been edited after the issuing institution generated them. You have no original to compare against, and manual visual inspection does not scale.

This tutorial shows you how to integrate the HTPBE API into a Python application to detect post-creation modifications forensically. By the end you will have production-ready code for single-document checks, verdict routing, batch processing, and async poll loops.

How PDF tamper detection works

Submit a PDF URL via POST /v1/analyze. Get back a check ID. Retrieve the verdict with GET /v1/result/{id}. The API analyzes the file’s internal structure — metadata timestamps, cross-reference tables, digital signatures, producer/creator fields — and returns one of three verdicts: intact, modified, or inconclusive.

See the how it works page for a deeper explanation of the forensic layers. No file upload required, no SDK to install. Standard HTTP.

Prerequisites

pip install requests

Get your API key at htpbe.tech/auth/signup. Every plan includes a free test key you can use immediately without consuming live quota.

Set it as an environment variable:

export HTPBE_API_KEY="your_api_key_here"

Step 1 — Basic integration

The minimal integration: submit a URL, retrieve the result, print the verdict.

import os
import time
import requests

API_KEY = os.environ["HTPBE_API_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
}


def check_pdf(pdf_url: str) -> dict:
    """Submit a PDF URL for forensic analysis and return the result."""
    # Step 1: submit the URL
    submit = requests.post(
        f"{BASE_URL}/analyze",
        headers=HEADERS,
        json={"url": pdf_url},
        timeout=30,
    )
    submit.raise_for_status()
    check_id = submit.json()["id"]

    # Step 2: retrieve the verdict
    result = requests.get(
        f"{BASE_URL}/result/{check_id}",
        headers=HEADERS,
        timeout=30,
    )
    result.raise_for_status()
    return result.json()


if __name__ == "__main__":
    report = check_pdf("https://your-storage.example.com/documents/statement.pdf")
    print(report["status"])  # intact | modified | inconclusive

The API is synchronous for most documents. Submit and retrieve in two sequential requests. Analysis typically completes in under two seconds.


Step 2 — Handle all three verdicts

A production integration must act differently on each verdict. The routing function below also reads the named markers, producer, and xref_count from the response.

def route_document(pdf_url: str, document_type: str = "generic") -> dict:
    """
    Analyze a PDF and return a routing decision.

    Returns a dict with keys:
      - action:   "accept" | "reject" | "review"
      - check_id: str  (store this for audit purposes)
      - reason:   str
    """
    report = check_pdf(pdf_url)
    status = report["status"]
    check_id = report["id"]
    markers = report.get("modification_markers", [])
    producer = report.get("producer") or "unknown"
    xref_count = report.get("xref_count", 1)

    if status == "intact":
        return {
            "action": "accept",
            "check_id": check_id,
            "reason": "Document structure is consistent with original issuance.",
        }

    if status == "modified":
        reason_parts = [f"Forensic markers: {', '.join(markers)}."]
        if xref_count > 1:
            reason_parts.append(f"File was saved {xref_count} times after creation.")
        if producer:
            reason_parts.append(f"Last processed by: {producer}.")
        return {
            "action": "reject",
            "check_id": check_id,
            "reason": " ".join(reason_parts),
        }

    # status == "inconclusive"
    # The document was created with consumer software and lacks institutional metadata.
    # For documents that should come from banks, tax authorities, or payroll systems,
    # inconclusive is a strong signal of fraud — treat it accordingly.
    institutional_types = {"bank_statement", "tax_document", "payslip", "insurance_policy"}
    if document_type in institutional_types:
        return {
            "action": "reject",
            "check_id": check_id,
            "reason": (
                f"Document origin ({producer}) is inconsistent with a {document_type}. "
                "Expected institutional software; consumer software detected."
            ),
        }

    return {
        "action": "review",
        "check_id": check_id,
        "reason": "Document created with consumer software. Route to manual review.",
    }

What inconclusive means

inconclusive does not mean the check failed. It means the document was created with consumer software — Microsoft Word, Google Docs, LibreOffice, Canva — and does not carry the structural fingerprint of an institutional document system.

For a user-uploaded CV or a self-drafted letter, this result is normal. For a document that claims to be a bank statement, payslip, or tax certificate, inconclusive is a strong fraud signal: real bank systems do not generate PDFs with Google Docs.

The modification_markers array

When status is modified, the modification_markers array names the specific signals that triggered the verdict:

MarkerWhat it means
INCREMENTAL_UPDATESThe file has more than one xref section — edited and re-saved after original creation
PRODUCER_MISMATCHCreator and Producer fields name different tools (bank system vs. consumer editor)
DIFFERENT_DATESModification date is inconsistent with the claimed creation date
MODIFICATIONS_AFTER_SIGNATUREContent was appended after a valid digital signature
SIGNATURE_REMOVEDA digital signature slot exists but the signature has been stripped

MODIFICATIONS_AFTER_SIGNATURE and SIGNATURE_REMOVED carry "certain" confidence, meaning the verdict is cryptographically verifiable. All other markers produce "high" confidence. For workflows where false positives are costly, you may want to auto-reject only "certain" markers and route "high" markers to manual review.


Step 3 — Batch processing

Process a list of PDF URLs, collect results, and filter the modified cases.

from concurrent.futures import ThreadPoolExecutor, as_completed


def batch_check(
    pdf_urls: list[str],
    document_type: str = "generic",
    max_workers: int = 5,
) -> list[dict]:
    """
    Run forensic checks on multiple PDFs in parallel.

    Returns a list of result dicts sorted by the original URL order.
    Each dict includes the URL, routing decision, and raw report fields.
    """
    results: dict[str, dict] = {}

    def check_one(url: str) -> tuple[str, dict]:
        try:
            decision = route_document(url, document_type)
            report = check_pdf(url)
            return url, {
                "url": url,
                "action": decision["action"],
                "check_id": decision["check_id"],
                "reason": decision["reason"],
                "status": report["status"],
                "markers": report.get("modification_markers", []),
                "producer": report.get("producer"),
                "xref_count": report.get("xref_count"),
                "error": None,
            }
        except requests.HTTPError as exc:
            return url, {
                "url": url,
                "action": "error",
                "check_id": None,
                "reason": f"HTTP {exc.response.status_code}",
                "status": None,
                "markers": [],
                "producer": None,
                "xref_count": None,
                "error": str(exc),
            }
        except Exception as exc:
            return url, {
                "url": url,
                "action": "error",
                "check_id": None,
                "reason": str(exc),
                "status": None,
                "markers": [],
                "producer": None,
                "xref_count": None,
                "error": str(exc),
            }

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(check_one, url): url for url in pdf_urls}
        for future in as_completed(futures):
            url, result = future.result()
            results[url] = result

    # preserve original order
    ordered = [results[url] for url in pdf_urls]
    return ordered


def filter_flagged(results: list[dict]) -> list[dict]:
    """Return only rejected documents for downstream handling."""
    return [r for r in results if r["action"] == "reject"]


# Example usage
if __name__ == "__main__":
    urls = [
        "https://your-storage.example.com/docs/statement-jan.pdf",
        "https://your-storage.example.com/docs/statement-feb.pdf",
        "https://your-storage.example.com/docs/statement-mar.pdf",
    ]

    all_results = batch_check(urls, document_type="bank_statement")
    flagged = filter_flagged(all_results)

    print(f"Checked: {len(all_results)}, Flagged: {len(flagged)}")
    for doc in flagged:
        print(f"  REJECT {doc['url']}")
        print(f"         Markers: {doc['markers']}")
        print(f"         Producer: {doc['producer']}")

Keep max_workers at 5 or below. The API enforces rate limits per plan — parallel requests that exceed your plan’s concurrency limit will receive 429 responses.


Step 4 — Async poll pattern (advanced)

For document intake pipelines where you submit files at one time and process results later — for example, when a user uploads a document during onboarding and your system checks it in the background — the submit-then-poll pattern decouples ingestion from analysis.

import os
import time
import requests
from dataclasses import dataclass


API_KEY = os.environ["HTPBE_API_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
}


@dataclass
class PendingCheck:
    check_id: str
    pdf_url: str
    submitted_at: float


def submit_for_analysis(pdf_url: str) -> PendingCheck:
    """Submit a PDF URL and return a handle for later retrieval."""
    response = requests.post(
        f"{BASE_URL}/analyze",
        headers=HEADERS,
        json={"url": pdf_url},
        timeout=30,
    )
    response.raise_for_status()
    check_id = response.json()["id"]
    return PendingCheck(
        check_id=check_id,
        pdf_url=pdf_url,
        submitted_at=time.time(),
    )


def poll_result(
    pending: PendingCheck,
    poll_interval: float = 1.0,
    timeout: float = 30.0,
) -> dict:
    """
    Poll for a result until it is ready or the timeout expires.

    In practice the API returns synchronously, so the first poll
    almost always succeeds. This loop handles edge cases and network
    retries for long-running queues.
    """
    deadline = time.time() + timeout
    while time.time() < deadline:
        response = requests.get(
            f"{BASE_URL}/result/{pending.check_id}",
            headers=HEADERS,
            timeout=30,
        )
        if response.status_code == 200:
            return response.json()
        if response.status_code == 404:
            # Not ready yet — wait and retry
            time.sleep(poll_interval)
            continue
        response.raise_for_status()

    raise TimeoutError(
        f"Result for {pending.check_id} not ready after {timeout}s"
    )


# Pipeline pattern: submit a batch, then retrieve when convenient
def intake_pipeline(urls: list[str]) -> list[dict]:
    # Phase 1: submit all documents
    pending_checks = []
    for url in urls:
        try:
            pending = submit_for_analysis(url)
            pending_checks.append(pending)
            print(f"Submitted {url}{pending.check_id}")
        except requests.HTTPError as exc:
            print(f"Submit failed for {url}: {exc}")

    # Phase 2: retrieve results (could be in a separate job/worker)
    results = []
    for pending in pending_checks:
        try:
            result = poll_result(pending)
            results.append(result)
            print(f"{pending.check_id}: {result['status']}")
        except (TimeoutError, requests.HTTPError) as exc:
            print(f"Retrieval failed for {pending.check_id}: {exc}")

    return results

Store PendingCheck objects in your database (or a Redis queue) between the submit and retrieve phases. The check_id is stable — you can retrieve results days after submission using GET /v1/result/{check_id}.


Full API response reference

A typical response from GET /v1/result/{id}:

{
  "id": "ck_9f4a2e1b-3d7c-4a8e-b1f2-9e0d3c5a7b8f",
  "status": "modified",
  "modification_confidence": "high",
  "modification_markers": ["PRODUCER_MISMATCH", "INCREMENTAL_UPDATES"],
  "xref_count": 3,
  "has_digital_signature": false,
  "modifications_after_signature": false,
  "signature_removed": false,
  "creator": "HSBC Document Service",
  "producer": "Smallpdf",
  "creation_date": 1764547200,
  "modification_date": 1742688000
}
FieldTypeNotes
status"intact" / "modified" / "inconclusive"Primary verdict
modification_confidence"certain" / "high" / "none"Confidence in the verdict
modification_markersstring[]Named signals that triggered modified
xref_countnumberNumber of cross-reference sections (edit sessions)
has_digital_signaturebooleanDocument carries a digital signature
modifications_after_signaturebooleanContent added after the signature byte range
signature_removedbooleanSignature slot present but signature stripped
creatorstring | nullSoftware that originally created the document
producerstring | nullSoftware that last saved the document
creation_datenumber | nullUnix timestamp of declared creation date
modification_datenumber | nullUnix timestamp of declared modification date

Testing with test keys

Every HTPBE plan includes a test API key (prefixed htpbe_test_). Test keys accept mock URLs and return deterministic responses — no live PDF required, no quota consumed.

import os
import requests

TEST_KEY = os.environ["HTPBE_TEST_KEY"]
BASE_URL = "https://api.htpbe.tech/v1"
HEADERS = {"Authorization": f"Bearer {TEST_KEY}", "Content-Type": "application/json"}

# Predictable test fixtures
TEST_URLS = {
    "intact":            "https://api.htpbe.tech/v1/test/clean.pdf",
    "modified_high":     "https://api.htpbe.tech/v1/test/modified-high.pdf",
    "inconclusive":      "https://api.htpbe.tech/v1/test/inconclusive.pdf",
    "sig_removed":       "https://api.htpbe.tech/v1/test/signature-removed.pdf",
    "modified_after_sig":"https://api.htpbe.tech/v1/test/modified-medium.pdf",
}


def test_all_verdict_branches():
    for scenario, url in TEST_URLS.items():
        submit = requests.post(f"{BASE_URL}/analyze", headers=HEADERS, json={"url": url}, timeout=30)
        submit.raise_for_status()
        check_id = submit.json()["id"]

        result = requests.get(f"{BASE_URL}/result/{check_id}", headers=HEADERS, timeout=30)
        result.raise_for_status()
        data = result.json()

        print(f"{scenario:20s} → status={data['status']}, confidence={data['modification_confidence']}")


if __name__ == "__main__":
    test_all_verdict_branches()

Use your test key in .env.test and your live key in .env — keep them in separate files so test traffic never reaches production routing.


What this approach does not detect

Forensic metadata analysis catches the common case: someone downloaded a legitimate PDF and edited it with a standard tool. It does not catch:

Documents fabricated from scratch using institutional tools. If an attacker generates a bank statement using the same software a real bank uses, sets plausible timestamps, and produces a structurally clean PDF, the document may pass analysis. This requires access to institutional software and deliberate counter-forensic effort — uncommon in fraud at scale.

Strongly encrypted PDFs. Encryption prevents reading the structural content. These return inconclusive. For document types that should never be encrypted (bank statements, payslips), treat inconclusive from an encrypted file the same as modified.

For the fraud patterns that account for the vast majority of real document manipulation — editing existing PDFs with consumer tools like iLovePDF, Smallpdf, or Adobe Reader — the forensic approach detects them reliably.


Who should integrate PDF modification detection

If your application accepts PDFs from untrusted parties — loan applicants, tenants, job candidates, insurance claimants — and you need a programmatic signal before human review, this integration adds a forensic layer in under 50 lines of Python.

The HTPBE API is available on self-serve plans from $15/month. Get your API key — free test keys are included on every plan, so you can build and test your integration before your first live check.

Share This Article

Found this article helpful? Share it with others to spread knowledge about PDF security and fraud detection.

https://htpbe.tech/blog/pdf-tamper-detection-python-tutorial

Secure your workflow

Create your account — API key on signup, free test environment on every plan.
From $15/mo. No sales call. Cancel any time.