Common Polling Patterns

Copy-paste recipes for the most frequent timeout-sampler use cases. Each recipe is self-contained and ready to drop into your project.

Note: All recipes assume you have already installed the package. See Getting Started with timeout-sampler for installation instructions.

Wait for an API to Become Ready

Poll an HTTP endpoint until it returns a successful response.

import requests
from timeout_sampler import TimeoutSampler

for sample in TimeoutSampler(
    wait_timeout=120,
    sleep=5,
    func=lambda: requests.get("http://localhost:8080/healthz").ok,
    exceptions_dict={requests.ConnectionError: [], requests.Timeout: []},
):
    if sample:
        break

The sampler calls the health-check endpoint every 5 seconds for up to 2 minutes. Connection errors and timeouts are silently retried thanks to exceptions_dict. The loop breaks as soon as the endpoint returns a 2xx response.

Tip: For long startup waits, increase wait_timeout and keep sleep between 2–10 seconds to avoid hammering the service.

Retry a Flaky Function with the @retry Decorator

Automatically re-run a function until it returns a truthy value.

from timeout_sampler import retry

@retry(wait_timeout=30, sleep=2)
def fetch_cluster_status():
    import requests
    resp = requests.get("https://api.example.com/cluster/status")
    resp.raise_for_status()
    return resp.json()["state"] == "ready"

# Raises TimeoutExpiredError after 30s if the cluster never reaches "ready"
fetch_cluster_status()

The @retry decorator wraps the function in a TimeoutSampler loop and returns the first truthy result. Use it when you want polling behavior without writing the iteration yourself.

  • The decorated function keeps its original signature — pass arguments as usual.
  • Any unhandled exception is immediately re-raised unless you add exceptions_dict.

See Retrying Functions with the @retry Decorator for full parameter details.

Poll with a Partial Function

Use functools.partial to poll a function that requires arguments without using func_args or keyword arguments.

from functools import partial
from timeout_sampler import TimeoutSampler

def check_pod_phase(namespace, pod_name):
    """Returns True when the pod is Running."""
    import subprocess, json
    result = subprocess.run(
        ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "json"],
        capture_output=True, text=True,
    )
    pod = json.loads(result.stdout)
    return pod["status"]["phase"] == "Running"

poll_fn = partial(check_pod_phase, "default", "my-app-pod-7f4b9")

for sample in TimeoutSampler(wait_timeout=90, sleep=3, func=poll_fn):
    if sample:
        break

TimeoutSampler resolves partial objects automatically when building log output, so function names and modules are logged correctly even through the wrapper. This pattern keeps the sampler call clean when the polled function has many parameters.

Pass Arguments via func_args and Keyword Arguments

Provide positional and keyword arguments directly to TimeoutSampler without wrapping in partial.

from timeout_sampler import TimeoutSampler

def is_file_present(directory, filename, min_size_bytes=0):
    import os
    path = os.path.join(directory, filename)
    return os.path.isfile(path) and os.path.getsize(path) >= min_size_bytes

for sample in TimeoutSampler(
    wait_timeout=60,
    sleep=2,
    func=is_file_present,
    func_args=("/tmp/exports", "report.csv"),
    min_size_bytes=1024,
):
    if sample:
        break

Positional arguments go into func_args as a tuple. Keyword arguments are passed directly as extra kwargs to the TimeoutSampler constructor, which forwards them to func on every call.

Ignore All Instances of an Exception

Swallow every occurrence of a specific exception type during polling.

from timeout_sampler import TimeoutSampler

def get_resource():
    import json, urllib.request
    resp = urllib.request.urlopen("http://localhost:9090/resource")
    return json.loads(resp.read())

for sample in TimeoutSampler(
    wait_timeout=30,
    sleep=2,
    func=get_resource,
    exceptions_dict={ConnectionError: [], TimeoutError: []},
):
    if sample:
        break

An empty list [] next to an exception class means ignore all messages for that exception. The sampler will keep retrying regardless of the exception's text content.

See Filtering and Handling Exceptions for a full explanation of exceptions_dict.

Filter Exceptions by Message Text

Only ignore exceptions whose message matches specific substrings.

from timeout_sampler import TimeoutSampler

def query_database():
    import sqlite3
    conn = sqlite3.connect("/var/data/app.db")
    cursor = conn.execute("SELECT count(*) FROM jobs WHERE status = 'done'")
    count = cursor.fetchone()[0]
    conn.close()
    if count == 0:
        raise RuntimeError("no completed jobs yet")
    return count

for sample in TimeoutSampler(
    wait_timeout=60,
    sleep=5,
    func=query_database,
    exceptions_dict={RuntimeError: ["no completed jobs yet"]},
):
    if sample:
        print(f"Completed jobs: {sample}")
        break

The sampler checks whether the raised exception's string representation contains any of the listed substrings. If a RuntimeError is raised with a different message (e.g., "database locked"), it will not be caught — it will immediately raise a TimeoutExpiredError.

Warning: Message matching uses substring in checks, not exact equality. The filter "not found" will also match "resource not found in namespace".

Combine Multiple Exception Filters

Handle several exception types, each with independent message filters.

from timeout_sampler import TimeoutSampler

def provision_vm():
    """Calls a cloud API that may fail in multiple ways."""
    # ... cloud SDK call ...
    return {"id": "vm-abc123", "status": "running"}

for sample in TimeoutSampler(
    wait_timeout=300,
    sleep=10,
    func=provision_vm,
    exceptions_dict={
        ConnectionError: [],                          # retry on any connection issue
        TimeoutError: [],                             # retry on any timeout
        RuntimeError: ["quota exceeded", "retryable"],  # only retry these messages
        PermissionError: ["token expired"],            # only retry on expired tokens
    },
):
    if sample and sample.get("status") == "running":
        print(f"VM provisioned: {sample['id']}")
        break

Each exception class has its own message filter list. This lets you broadly retry transient network errors while being selective about application-level exceptions. Any exception type or message not listed will immediately surface as a TimeoutExpiredError.

See How Exception Matching Works for the inheritance-aware matching algorithm.

Leverage Exception Inheritance for Broad Matching

Catch a parent exception class to automatically cover all its subclasses.

from timeout_sampler import TimeoutSampler

class ServiceError(Exception):
    pass

class TransientError(ServiceError):
    pass

class RateLimitError(ServiceError):
    pass

def call_external_service():
    # ... API call that may raise TransientError or RateLimitError ...
    return True

for sample in TimeoutSampler(
    wait_timeout=60,
    sleep=3,
    func=call_external_service,
    exceptions_dict={ServiceError: []},
):
    if sample:
        break

Listing ServiceError in exceptions_dict catches both TransientError and RateLimitError because TimeoutSampler uses isinstance() to match exceptions. You don't need to enumerate every subclass individually.

Tip: Use broad parent-class matching for exception hierarchies you control, and specific-class matching for third-party exceptions where you want precise control.

Wait for a Return Value to Match a Condition

Poll until the function returns a specific value, not just a truthy one.

from timeout_sampler import TimeoutSampler

def get_deployment_replicas():
    """Returns the current number of ready replicas."""
    import subprocess, json
    result = subprocess.run(
        ["kubectl", "get", "deployment", "web-api", "-o", "json"],
        capture_output=True, text=True,
    )
    deploy = json.loads(result.stdout)
    return deploy["status"].get("readyReplicas", 0)

desired_replicas = 3

for sample in TimeoutSampler(wait_timeout=120, sleep=5, func=get_deployment_replicas):
    if sample == desired_replicas:
        break

The if condition inside the loop is your match logic — you can check equality, membership, ranges, or any predicate. The sampler itself only yields; your code decides what constitutes success.

Silence All Log Output

Disable logging for test suites or inner loops where verbosity is unwanted.

from timeout_sampler import TimeoutSampler

for sample in TimeoutSampler(
    wait_timeout=10,
    sleep=1,
    func=lambda: True,
    print_log=False,
    print_func_log=False,
):
    if sample:
        break

Setting print_log=False suppresses elapsed-time messages, and print_func_log=False suppresses function call details. Use both together for completely silent polling.

See Controlling Log Output for fine-grained logging options including print_func_args.

Track Remaining Time Across Multiple Polling Steps

Use TimeoutWatch to share a single time budget across sequential polling operations.

from timeout_sampler import TimeoutSampler, TimeoutWatch

overall_timeout = TimeoutWatch(timeout=120)

# Step 1: Wait for database
for sample in TimeoutSampler(
    wait_timeout=overall_timeout.remaining_time(),
    sleep=3,
    func=lambda: __import__("os").path.exists("/tmp/db.ready"),
):
    if sample:
        break

# Step 2: Wait for cache (uses remaining time from same budget)
for sample in TimeoutSampler(
    wait_timeout=overall_timeout.remaining_time(),
    sleep=2,
    func=lambda: __import__("os").path.exists("/tmp/cache.ready"),
):
    if sample:
        break

print(f"Both ready with {overall_timeout.remaining_time():.1f}s to spare")

TimeoutWatch.remaining_time() returns the seconds left from the original timeout, automatically accounting for elapsed time. Pass it as wait_timeout to give each subsequent step only the remaining budget.

See Tracking Elapsed Time with TimeoutWatch for the full TimeoutWatch API.

Catch TimeoutExpiredError and Inspect the Last Exception

Access diagnostic information when polling fails.

from timeout_sampler import TimeoutExpiredError, TimeoutSampler

def unstable_lookup():
    raise ConnectionError("connection refused on port 5432")

try:
    for sample in TimeoutSampler(
        wait_timeout=10,
        sleep=2,
        func=unstable_lookup,
        exceptions_dict={ConnectionError: []},
    ):
        if sample:
            break
except TimeoutExpiredError as e:
    print(f"Polling failed: {e}")
    print(f"Last exception type: {type(e.last_exp).__name__}")  # ConnectionError
    print(f"Last exception message: {e.last_exp}")              # connection refused on port 5432
    print(f"Total elapsed time: {e.elapsed_time}s")

TimeoutExpiredError exposes last_exp (the last exception raised by the polled function) and elapsed_time (total seconds spent polling). Use these for detailed error reporting or conditional recovery logic.

See TimeoutExpiredError Reference for all available attributes.