Setting up Google OR-Tools for Waste Collection Route Optimization & Compliance Logging

Environment isolation, callback signatures, and dimension binding patterns for stable solver runs.

Deploying deterministic routing for municipal waste operations requires strict environment isolation and reproducible solver states. Provision a Python 3.10+ virtual environment with pinned dependency manifests. Install ortools via pip, ensuring C++ backend binaries match your host architecture (manylinux2014_x86_64 or macosx_11_0_universal2). Verify the installation by importing pywrapcp and checking the solver version against your deployment manifest.

Real-world fleet telemetry introduces GPS drift, missing bin weights, and asynchronous timestamps. Implement a preprocessing layer that interpolates missing coordinates using Kalman filtering and normalizes service durations against historical dwell-time distributions. This cleaned dataset feeds directly into the distance matrix builder for the VRP Route Optimization Algorithms framework. The following workflow demonstrates a single production constraint: enforcing municipal time windows alongside strict capacity limits, with automated fallback routing and immutable compliance logging.

Environment & Dependency Pinning

python -m venv .venv
source .venv/bin/activate
pip install ortools==9.7.2996 pandas==2.1.4 numpy==1.26.2 structlog==23.2.0

Core Constraint Workflow: Time Windows + Capacity with Compliance Logging

The implementation below isolates the routing model, applies hard capacity and time-window constraints, executes with a deterministic seed, and triggers a capacity-relaxed fallback if the primary solve fails. All solver states and route assignments are serialized into structured JSON for audit reconstruction.

import json
import hashlib
import logging
import tracemalloc
from ortools.constraint_solver import routing_enums_pb2, pywrapcp

logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("waste_routing_compliance")

# Realistic mock telemetry payload
TELEMETRY_PAYLOAD = [
    {"stop_id": "DEPOT_01", "lat": 40.7128, "lon": -74.0060, "weight_kg": 0,   "window": [0, 1440]},
    {"stop_id": "BIN_A1",   "lat": 40.7140, "lon": -74.0055, "weight_kg": 185, "window": [480, 720]},
    {"stop_id": "BIN_A2",   "lat": 40.7155, "lon": -74.0040, "weight_kg": 210, "window": [480, 720]},
    {"stop_id": "BIN_B1",   "lat": 40.7130, "lon": -74.0080, "weight_kg": 195, "window": [720, 960]},
    {"stop_id": "BIN_B2",   "lat": 40.7110, "lon": -74.0095, "weight_kg": 220, "window": [720, 960]},
]

TRUCK_CAPACITY_KG = 800
SPEED_M_PER_MIN = 500  # Urban grid average (~30 km/h)
SERVICE_TIME_MIN = 3


def build_matrices(stops):
    n = len(stops)
    dist_matrix = [[0] * n for _ in range(n)]
    time_matrix = [[0] * n for _ in range(n)]
    for i in range(n):
        for j in range(n):
            if i != j:
                d = 111_320 * ((stops[i]["lat"] - stops[j]["lat"]) ** 2 +
                               (stops[i]["lon"] - stops[j]["lon"]) ** 2) ** 0.5
                dist_matrix[i][j] = int(d)
                time_matrix[i][j] = int(d / SPEED_M_PER_MIN) + SERVICE_TIME_MIN
    return dist_matrix, time_matrix


def solve_waste_vrp(truck_capacity_kg: int = TRUCK_CAPACITY_KG):
    """Solve VRP with capacity and time-window constraints.

    The capacity parameter is accepted explicitly so the fallback can pass a
    relaxed value without mutating shared state — RoutingDimension has no
    public SetCapacity method in the OR-Tools Python API.
    """
    tracemalloc.start()
    dist_matrix, time_matrix = build_matrices(TELEMETRY_PAYLOAD)
    manager = pywrapcp.RoutingIndexManager(len(TELEMETRY_PAYLOAD), 1, 0)
    routing = pywrapcp.RoutingModel(manager)

    # Distance callback
    transit_callback_index = routing.RegisterTransitCallback(
        lambda fi, ti: dist_matrix[manager.IndexToNode(fi)][manager.IndexToNode(ti)]
    )
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

    # Capacity dimension (strict payload limit)
    demands = [s["weight_kg"] for s in TELEMETRY_PAYLOAD]
    demand_callback = routing.RegisterUnaryTransitCallback(
        lambda fi: demands[manager.IndexToNode(fi)]
    )
    routing.AddDimensionWithVehicleCapacity(
        demand_callback, 0, [truck_capacity_kg], True, "Capacity"
    )

    # Time window dimension (municipal noise ordinance)
    time_callback = routing.RegisterTransitCallback(
        lambda fi, ti: time_matrix[manager.IndexToNode(fi)][manager.IndexToNode(ti)]
    )
    routing.AddDimension(time_callback, 30, 1440, False, "Time")
    time_dimension = routing.GetDimensionOrDie("Time")
    for idx, stop in enumerate(TELEMETRY_PAYLOAD):
        node = manager.NodeToIndex(idx)
        time_dimension.CumulVar(node).SetRange(stop["window"][0], stop["window"][1])

    # Solver configuration
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    search_parameters.local_search_metaheuristic = routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    search_parameters.time_limit.seconds = 15
    search_parameters.random_seed = 42

    solution = routing.SolveWithParameters(search_parameters)
    return routing, manager, solution


def extract_compliance_manifest(routing, manager, solution, fallback: bool = False):
    route_data = []
    total_load = 0
    for vehicle_id in range(manager.GetNumberOfVehicles()):
        index = routing.Start(vehicle_id)
        route_stops = []
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            route_stops.append(TELEMETRY_PAYLOAD[node])
            total_load += TELEMETRY_PAYLOAD[node]["weight_kg"]
            index = solution.Value(routing.NextVar(index))
        route_data.append({"vehicle_id": vehicle_id, "stops": route_stops})

    manifest = {
        "timestamp": "2024-03-15T06:30:00Z",
        "solver_status": "OPTIMAL" if not fallback else "FALLBACK_ACTIVE",
        "total_payload_kg": total_load,
        "epa_threshold_exceeded": total_load > 4000,
        "routes": route_data,
        "fallback_triggered": fallback,
    }
    manifest_json = json.dumps(manifest, sort_keys=True)
    manifest["sha256_hash"] = hashlib.sha256(manifest_json.encode()).hexdigest()
    return manifest


def run():
    # Primary solve
    routing, manager, solution = solve_waste_vrp(TRUCK_CAPACITY_KG)
    if solution:
        manifest = extract_compliance_manifest(routing, manager, solution)
        logger.info(json.dumps({"event": "route_optimized", "payload": manifest}))
        return manifest

    logger.warning(json.dumps({"event": "solver_failure", "error": "Primary solver failed to converge"}))

    # Fallback: rebuild the model with a 25% capacity relaxation.
    # OR-Tools RoutingDimension has no runtime SetCapacity method — the model
    # must be reconstructed with the new vehicle_capacities value.
    relaxed_capacity = int(TRUCK_CAPACITY_KG * 1.25)
    routing_fb, manager_fb, solution_fb = solve_waste_vrp(relaxed_capacity)
    if solution_fb:
        logger.info(json.dumps({"event": "fallback_capacity_relaxed", "status": "success"}))
        return extract_compliance_manifest(routing_fb, manager_fb, solution_fb, fallback=True)

    logger.critical(json.dumps({"event": "fallback_greedy_assignment", "status": "triggered"}))
    return {"status": "GREEDY_FALLBACK", "hash": "N/A"}


if __name__ == "__main__":
    run()
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    logger.info(json.dumps({"metric": "solver_memory_peak_mb", "value": round(peak / 10**6, 2)}))

Constraint Validation & Compliance Alignment

Municipal regulations require immutable records of route assignments and vehicle loads. The extract_compliance_manifest function maps each stop to the original telemetry payload, validates total payload against EPA reporting thresholds, and attaches a cryptographic SHA-256 hash before pushing to the compliance data lake. The exact API signatures for dimension registration and callback registration are documented in the OR-Tools Implementation reference.

When solver convergence fails, the fallback rebuilds the routing model with a relaxed capacity ceiling. This is necessary because RoutingDimension does not expose a SetCapacity method after model construction; the vehicle capacity list is registered once via AddDimensionWithVehicleCapacity and cannot be mutated at runtime. All fallback activations emit structured JSON payloads containing the exact constraint violation context, enabling audit reconstruction without manual log parsing.

Isolation & Observability

Isolate solver failures by exporting the routing model state via routing.WriteToFile() where supported, or by logging the full constraint configuration at dispatch time. Cross-reference constraint violations against routing.GetDimensionOrDie() to pinpoint capacity breaches. Use deterministic seeds via search_parameters.random_seed to reproduce routing anomalies. Validate matrix symmetry and triangle inequality compliance to prevent artificial route inflation.

Containerize the routing service with explicit CPU pinning (taskset or Kubernetes cpuManagerPolicy: static) and memory cgroups. Expose Prometheus metrics for solve latency, fallback frequency, and constraint violation rates. Schedule automated regression tests against synthetic noise profiles to validate solver stability before production deployment. For detailed memory profiling techniques, reference the official Python tracemalloc documentation.