配送调度代码的实现方式和逻辑
配送调度系统通过多因素评分和匈牙利算法实现全局最优分配。系统架构包含订单数据、配送员信息和环境因素三个模块。订单模型记录位置、重量、紧急度等属性;配送员模型包含位置、速度、疲劳度等状态。核心算法包括:1)Haversine公式计算球面距离;2)考虑环境因素的时间估算;3)多因素加权评分(距离30%、时间25%、配送员质量20%、疲劳度15%)。系统最终通过匈牙利算法将订单与配送员进行最优匹配,实现高效配送调度。


一、简单架构

┌─────────────────────────────────────────────────────────────────┐
│                      配送调度系统                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────┐    ┌──────────────┐    ┌─────────────────────┐  │
│  │ 订单数据  │───▶│ 多因素评分   │───▶│ 匈牙利算法全局最优  │  │
│  │ (Order)  │    │   模块       │    │     分配            │  │
│  └──────────┘    └──────────────┘    └─────────────────────┘  │
│       │                 │                        │              │
│       │                 │                        │              │
│  ┌──────────┐    ┌──────────────┐    ┌─────────────────────┐  │
│  │ 配送员    │───▶│ 疲劳度/质量  │───▶│   最终分配结果      │  │
│  │ (Courier)│    │   评估       │    │ {订单:配送员}       │  │
│  └──────────┘    └──────────────┘    └─────────────────────┘  │
│                                                                 │
│  ┌──────────┐                                                   │
│  │ 环境因素  │───▶ 影响时间估算和配送效率                         │
│  │ (天气/交通)                                                 │
│  └──────────┘                                                   │
└─────────────────────────────────────────────────────────────────┘

二、数据模型

2.1 订单 (Order)

@dataclass
class Order:
    order_id: str           # 订单唯一标识
    pickup_lat/lng: float   # 取货地坐标
    dropoff_lat/lng: float  # 送货地坐标
    weight: float           # 货物重量(kg)
    urgency: int            # 紧急度 1-5
    create_time: int        # 订单创建时间(秒)
    deadline: int           # 最晚送达时间(秒)
    fee: float              # 配送费

真实场景含义

  • 紧急度5:写字楼/医院等时效要求高的订单
  • 紧急度1-2:居民区普通配送
  • deadline:用户设定的期望送达时间

2.2 配送员 (Courier)

@dataclass
class Courier:
    courier_id: str         # 配送员ID
    lat/lng: float          # 当前位置
    speed: float            # 骑行速度(km/h)
    rating: float           # 评分 4.0-5.0
    fatigue: float          # 疲劳度 0-100,100最佳
    orders_today: int       # 今日已完成订单数
    region: str             # 所属区域

疲劳度含义

  • 100分:刚休息完,效率最高
  • 50分:连续工作4-6小时
  • <30分:效率严重下降,应强制休息

2.3 环境 (Environment)

@dataclass
class Environment:
    weather_factor: float   # 天气影响 0-1
    traffic_factor: float   # 交通影响 0-1

三、算法详解

3.1 距离计算:Haversine公式

def haversine(self, lat1, lng1, lat2, lng2):
    """计算两点间球面距离(km)"""
    # 1. 角度转弧度
    lat1, lng1, lat2, lng2 = map(np.radians, [lat1, lng1, lat2, lng2])
    
    # 2. Haversine公式
    dlat = lat2 - lat1
    dlng = lng2 - lng1
    a = sin²(Δlat/2) + cos(lat1) * cos(lat2) * sin²(Δlng/2)
    c = 2 * arcsin(√a)
    
    # 3. 乘以地球半径
    return 6371.0 * c

几何原理

        北极
          *
         /|\
        / | \
       /  |  \
      /   |   \
     /  d |    \   d = R × c (弧长)
    /     |     \
   /      |      \
  *_______|_______*  赤道
 lat1    lat2
    
公式推导:
a = sin²(Δlat/2) + cos(lat1) × cos(lat2) × sin²(Δlng/2)
c = 2 × arcsin(√a)
距离 = R × c

3.2 时间估算

def estimate_time(self, distance, courier_speed):
    """估算行程时间(分钟)"""
    # 基础时间 = 距离/速度
    base_time = distance / courier_speed * 60  # 转换为分钟
    
    # 环境调整:天气差/交通堵会增加耗时
    env_factor = 1.0 + weather_factor * 0.5 + traffic_factor * 0.5
    
    return base_time * env_factor

示例计算

  • 距离2km,配送员速度20km/h
  • 基础时间:2/20 × 60 = 6分钟
  • 晚高峰(交通0.5) + 阴天(天气0.3)
  • 环境因子:1.0 + 0.3×0.5 + 0.5×0.5 = 1.4
  • 实际估算时间:6 × 1.4 = 8.4分钟

3.3 多因素加权匹配分

这是核心算法,完整流程如下:

def calculate_match_score(self, order, courier):
    # ═══════════════════════════════════════════════════
    # 因素1: 距离评分 (权重30%)
    # ═══════════════════════════════════════════════════
    pickup_dist = haversine(courier.pos, order.pickup)
    delivery_dist = haversine(order.pickup, order.dropoff)
    
    # 评分逻辑:10km外得0分,0km得100分
    dist_score = max(0, 100 * (1 - pickup_dist / 10))
    
    # 示例:取货距离2km → dist_score = 100×(1-0.2) = 80分
    
    # ═══════════════════════════════════════════════════
    # 因素2: 时间评分 (权重25%)
    # ═══════════════════════════════════════════════════
    total_time = estimate_time(pickup_dist) + estimate_time(delivery_dist)
    time_window = order.deadline - order.create_time  # 可用时间
    
    time_ratio = total_time / time_window
    
    if time_ratio > 1.5:      # 预计耗时超过时间窗口150%
        time_score = 0        # 肯定超时,不可接
    else:
        time_score = max(0, 100 * (1 - time_ratio))
    
    # 示例:预计30分钟,时间窗口40分钟 → ratio=0.75
    # time_score = 100 × (1 - 0.75) = 25分
    
    # ═══════════════════════════════════════════════════
    # 因素3: 配送员质量 (权重20%)
    # ═══════════════════════════════════════════════════
    # 评分4.0=0分,5.0=50分
    rating_score = (courier.rating - 4.0) * 50
    
    # ═══════════════════════════════════════════════════
    # 因素4: 疲劳度 (权重15%)
    # ═══════════════════════════════════════════════════
    fatigue_score = courier.fatigue  # 原始疲劳度
    
    # 今日订单过多惩罚
    if courier.orders_today > 20:
        fatigue_score *= 0.7    # 严重疲劳,打7折
    elif courier.orders_today > 15:
        fatigue_score *= 0.85   # 轻度疲劳,打85折
    
    # ═══════════════════════════════════════════════════
    # 因素5: 环境适应 (权重10%)
    # ═══════════════════════════════════════════════════
    env_score = 100 * (1 - weather*0.5 - traffic*0.3)
    
    # ═══════════════════════════════════════════════════
    # 综合评分 = Σ(单项分 × 权重)
    # ═══════════════════════════════════════════════════
    total_score = (
        dist_score    * 0.30 +   # 距离
        time_score    * 0.25 +   # 时间
        rating_score  * 0.20 +   # 质量
        fatigue_score * 0.15 +   # 疲劳
        env_score     * 0.10     # 环境
    )
    
    return total_score, details

评分矩阵示例(10订单 × 8配送员):

        C001     C002     C003     C004     C005     C006     C007     C008
O001    80.4     81.0     75.7     66.3     74.5     67.5     67.9     71.0  ← O001给C002最高分
O002    80.3     81.0     75.7     66.5     74.7     67.6     68.0     71.3  ← O002给C002最高分
...

3.4 匈牙利算法全局最优求解

这是数学优化部分,保证全局最优而非贪心局部最优。

问题转化

原始问题:找到最优的 {订单→配送员} 匹配方案

数学建模:
- 构建矩阵 M[订单数][配送员数]
- M[i][j] = 订单i分配给配送员j的匹配分

求解目标:找到一个一一映射,最大化总分

算法原理

def solve_hungarian(self):
    # 1. 构建成本矩阵
    # 匈牙利算法是最小化,我们用 -score 转成成本
    cost_matrix = np.full((n_orders, n_couriers), 1e10)  # 初始化为极大值
    
    for i, order in enumerate(self.orders):
        for j, courier in enumerate(self.couriers):
            score, details = calculate_match_score(order, courier)
            if details['can_deliver']:  # 可行性检查
                cost_matrix[i, j] = -score  # 转为成本(越负越好)
    
    # 2. 调用scipy实现
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    # row_ind: 选中的订单索引
    # col_ind: 对应的配送员索引
    
    # 3. 构建结果
    result = {}
    for i, j in zip(row_ind, col_ind):
        if cost_matrix[i, j] < 1e9:  # 有效匹配
            result[orders[i].order_id] = couriers[j].courier_id
    
    return result

匈牙利算法图解

假设3个订单、3个配送员,匹配分矩阵:

        C001  C002  C003
O001    80    65    72
O002    45    90    55
O003    75    60    85

贪心解(局部最优):
O001→C001(80), O002→C002(90), O003→? → 只能用C003(85)
总分 = 80 + 90 + 85 = 255

匈牙利解(全局最优):
O001→C003(72), O002→C002(90), O003→C001(75)  
总分 = 72 + 90 + 75 = 237 ← 等等,这不对

重新算:
O001→C001(80), O002→? 只能C002(90), O003→C003(85) 
= 255 ← 贪心是对的

换个例子:
        C001  C002  C003
O001    90    80    75
O002    80    90    75  
O003    75    75    90

贪心:O001→C001(90), O002→C002(90), O003→C003(90) = 270
匈牙利:O001→C002(80), O002→C001(80), O003→C003(90) = 250 ← 更差!

实际匈牙利会选择能最大化总分的组合

四、调度流程

输入: 10个订单, 8个配送员, 环境因素

┌─────────────────────────────────────────────────────────────┐
│ Step 1: 环境设置                                            │
│   天气因子=0.3(阴天), 交通因子=0.5(晚高峰)                    │
│   → 影响时间估算,环境越差用时越长                           │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│ Step 2: 遍历计算所有(订单,配送员)匹配分                      │
│                                                             │
│   for order in orders:                                      │
│       for courier in couriers:                              │
│           score = calculate_match_score(order, courier)     │
│           记录到矩阵                                         │
│                                                             │
│   → 输出: 10×8 匹配分矩阵                                    │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│ Step 3: 匈牙利算法求解全局最优分配                            │
│                                                             │
│   调用 linear_sum_assignment(cost_matrix)                   │
│   → 输出: 8个最优匹配对                                      │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│ Step 4: 生成详细报告                                         │
│                                                             │
│   for each match:                                           │
│       - 订单ID、分配给谁                                     │
│       - 匹配分及五项评分明细                                  │
│       - 预计取货距离、配送距离、总时间                         │
│                                                             │
│   计算统计:匹配率、总分、平均分                               │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│ 最终输出:                                                   │
│   O002 → C002 (匹配分81.0)                                  │
│   O001 → C001 (匹配分80.4)                                  │
│   O006 → C005 (匹配分78.3)                                  │
│   ...                                                       │
│                                                             │
│   未能匹配: O004, O009 (时间窗口太紧)                        │
└─────────────────────────────────────────────────────────────┘

五、权重设计

WEIGHTS = {
    'distance': 0.30,         # 距离最重要,影响取货效率
    'time': 0.25,             # 次重要,保证准时送达
    'courier_quality': 0.20,  # 配送员能力不可忽视
    'fatigue': 0.15,          # 疲劳影响安全和工作效率
    'environment': 0.10,      # 环境因素相对次要
}

为什么这样分配

因素 权重 理由
距离 30% 距离近→取货快→用户体验好
时间 25% 超时是最严重的问题
质量 20% 高手送得快且不易出错
疲劳 15% 疲劳影响安全,不能忽视
环境 10% 影响所有订单,差异化小

六、可拓展维度

当前系统 ──────────────────────────────────────────▶ 生产系统

现有功能:
✓ 多因素评分
✓ 匈牙利全局最优
✓ 疲劳度考虑
✓ 环境因素

待扩展功能:
○ 实时交通API接入(高德/百度)
○ 骑手实时GPS追踪
○ 多订单路径优化(VRP问题)
○ 动态权重调整(高峰期自动调优)
○ 机器学习预测需求
○ 异常处理(改派、取消)

七、代码

"""
配送调度系统 - 生产级实现
1. 多因素加权匹配分计算
2. 全局最优分配(匈牙利算法 + 遗传算法)
3. 真实场景模拟
"""

import numpy as np
from scipy.optimize import linear_sum_assignment
from typing import List, Dict, Tuple
from dataclasses import dataclass
import random


@dataclass
class Order:
    """订单"""
    order_id: str
    pickup_lat: float
    pickup_lng: float
    dropoff_lat: float
    dropoff_lng: float
    weight: float  # kg
    urgency: int   # 1-5, 5最高
    create_time: int  # 创建时间(秒)
    deadline: int     # 最晚送达时间(秒)
    fee: float        # 配送费


@dataclass
class Courier:
    """配送员"""
    courier_id: str
    lat: float
    lng: float
    speed: float       # km/h
    rating: float      # 评分 4.0-5.0
    fatigue: float     # 疲劳度 0-100, 100为最佳
    orders_today: int  # 今日已完成订单
    region: str        # 所属区域


@dataclass
class Environment:
    """环境因素"""
    weather_factor: float    # 天气影响 0-1, 1最差
    traffic_factor: float    # 交通影响 0-1, 1最差


class DispatchScheduler:
    """配送调度器"""

    EARTH_RADIUS = 6371.0  # km

    # 多因素权重(可调整)
    WEIGHTS = {
        'distance': 0.30,      # 距离因素
        'time': 0.25,           # 时间因素
        'courier_quality': 0.20,  # 配送员质量
        'fatigue': 0.15,        # 疲劳因素
        'environment': 0.10,    # 环境因素
    }

    def __init__(self):
        self.orders: List[Order] = []
        self.couriers: List[Courier] = []
        self.env = Environment(weather_factor=0.2, traffic_factor=0.3)

    def set_environment(self, weather: float, traffic: float):
        """设置环境因素"""
        self.env.weather_factor = weather
        self.env.traffic_factor = traffic

    def haversine(self, lat1, lng1, lat2, lng2):
        """计算两点间球面距离(km)"""
        lat1, lng1, lat2, lng2 = map(np.radians, [lat1, lng1, lat2, lng2])
        dlat, dlng = lat2 - lat1, lng2 - lng1
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlng/2)**2
        return self.EARTH_RADIUS * 2 * np.arcsin(np.sqrt(a))

    def estimate_time(self, distance: float, courier_speed: float) -> float:
        """估算行程时间(分钟),考虑环境和交通"""
        base_time = distance / courier_speed * 60  # 分钟

        # 环境调整系数
        env_factor = 1.0 + self.env.weather_factor * 0.5 + self.env.traffic_factor * 0.5

        return base_time * env_factor

    def calculate_match_score(self, order: Order, courier: Courier) -> Tuple[float, Dict]:
        """
        计算订单-配送员匹配分数

        Returns:
            (score, details): 分数和详细评分项
        """
        # === 1. 距离因素 ===
        pickup_dist = self.haversine(courier.lat, courier.lng, order.pickup_lat, order.pickup_lng)
        delivery_dist = self.haversine(order.pickup_lat, order.pickup_lng, order.dropoff_lat, order.dropoff_lng)
        total_dist = pickup_dist + delivery_dist

        # 距离评分:越近越高,10km为0分
        dist_score = max(0, 100 * (1 - pickup_dist / 10))

        # === 2. 时间因素 ===
        pickup_time = self.estimate_time(pickup_dist, courier.speed)
        delivery_time = self.estimate_time(delivery_dist, courier.speed)
        total_time = pickup_time + delivery_time

        # 可用时间窗口
        time_window = order.deadline - order.create_time

        # 时间紧迫度评分
        if time_window <= 0:
            time_score = 0  # 已超时
        else:
            time_ratio = total_time / time_window
            if time_ratio > 1.5:
                time_score = 0  # 肯定超时
            else:
                time_score = max(0, 100 * (1 - time_ratio))

        # === 3. 配送员质量 ===
        rating_score = (courier.rating - 4.0) * 50  # 4.0分=0分, 5.0分=50分

        # === 4. 疲劳因素 ===
        # 疲劳度高效率低,疲劳度低效率高
        fatigue_score = courier.fatigue  # 0-100

        # 今日订单过多惩罚
        if courier.orders_today > 20:
            fatigue_score *= 0.7
        elif courier.orders_today > 15:
            fatigue_score *= 0.85

        # === 5. 环境适应 ===
        env_score = 100 * (1 - self.env.weather_factor * 0.5 - self.env.traffic_factor * 0.3)

        # === 综合评分 ===
        total_score = (
            dist_score * self.WEIGHTS['distance'] +
            time_score * self.WEIGHTS['time'] +
            rating_score * self.WEIGHTS['courier_quality'] +
            fatigue_score * self.WEIGHTS['fatigue'] +
            env_score * self.WEIGHTS['environment']
        )

        details = {
            'pickup_dist': pickup_dist,
            'delivery_dist': delivery_dist,
            'total_dist': total_dist,
            'pickup_time': pickup_time,
            'delivery_time': delivery_time,
            'total_time': total_time,
            'time_window': time_window,
            'dist_score': dist_score,
            'time_score': time_score,
            'rating_score': rating_score,
            'fatigue_score': fatigue_score,
            'env_score': env_score,
            'total_score': total_score,
            'can_deliver': time_ratio <= 1.5 and pickup_dist <= 5.0
        }

        return total_score, details

    def solve_hungarian(self) -> Dict[str, str]:
        """
        使用匈牙利算法求解全局最优分配

        Returns:
            {order_id: courier_id}: 订单分配结果
        """
        if not self.orders or not self.couriers:
            return {}

        n_orders = len(self.orders)
        n_couriers = len(self.couriers)

        # 构建成本矩阵(负数为收益,转为越小越好)
        cost_matrix = np.full((n_orders, n_couriers), 1e10)

        valid_matches = []
        for i, order in enumerate(self.orders):
            for j, courier in enumerate(self.couriers):
                score, details = self.calculate_match_score(order, courier)
                if details['can_deliver']:
                    cost_matrix[i, j] = -score
                    valid_matches.append((i, j, score))

        if not valid_matches:
            return {}

        # 匈牙利算法
        row_ind, col_ind = linear_sum_assignment(cost_matrix)

        # 构建最优分配
        result = {}
        for i, j in zip(row_ind, col_ind):
            if cost_matrix[i, j] < 1e9:  # 有效匹配
                result[self.orders[i].order_id] = self.couriers[j].courier_id

        return result

    def solve_greedy(self) -> Dict[str, str]:
        """
        贪心算法求解(快速但非最优)
        """
        if not self.orders or not self.couriers:
            return {}

        result = {}
        available_couriers = set(range(len(self.couriers)))
        orders = sorted(self.orders, key=lambda o: o.urgency, reverse=True)  # 优先高紧急

        for order in orders:
            best_score = -1
            best_courier_idx = -1

            for idx in available_couriers:
                courier = self.couriers[idx]
                score, details = self.calculate_match_score(order, courier)

                if details['can_deliver'] and score > best_score:
                    best_score = score
                    best_courier_idx = idx

            if best_courier_idx >= 0:
                result[order.order_id] = self.couriers[best_courier_idx].courier_id
                available_couriers.remove(best_courier_idx)

        return result

    def get_full_report(self, assignment: Dict[str, str]) -> List[Dict]:
        """
        生成完整分配报告
        """
        report = []

        for order_id, courier_id in assignment.items():
            order = next(o for o in self.orders if o.order_id == order_id)
            courier = next(c for c in self.couriers if c.courier_id == courier_id)
            score, details = self.calculate_match_score(order, courier)

            report.append({
                'order_id': order_id,
                'courier_id': courier_id,
                'order_urgency': order.urgency,
                'order_fee': order.fee,
                'pickup_dist': details['pickup_dist'],
                'delivery_dist': details['delivery_dist'],
                'total_time': details['total_time'],
                'match_score': score,
                'score_breakdown': details
            })

        # 按匹配分排序
        report.sort(key=lambda x: x['match_score'], reverse=True)
        return report


def create_realistic_scenario() -> Tuple[List[Order], List[Courier], Environment]:
    """
    创建真实场景数据

    场景:晚高峰时期,某商圈周边10km范围
    时间:17:00-19:00高峰段
    """

    # === 订单数据(10个待配送订单) ===
    orders = [
        # 商场取餐配送(高紧急度)
        Order("O001", 39.9123, 116.4421, 39.9089, 116.4356, 2.5, 5,
              create_time=0, deadline=35, fee=8.0),   # 紧急取餐
        Order("O002", 39.9123, 116.4421, 39.9156, 116.4498, 1.2, 4,
              create_time=2, deadline=40, fee=6.0),   # 商场订单
        Order("O003", 39.9088, 116.4401, 39.9034, 116.4512, 3.0, 3,
              create_time=5, deadline=50, fee=5.0),   # 午餐配送

        # 写字楼配送(中高紧急度)
        Order("O004", 39.9156, 116.4388, 39.9189, 116.4321, 1.5, 5,
              create_time=3, deadline=30, fee=10.0),  # 写字楼紧急
        Order("O005", 39.9101, 116.4456, 39.9212, 116.4398, 2.0, 4,
              create_time=6, deadline=45, fee=7.0),  # 商务区

        # 普通居民区
        Order("O006", 39.9067, 116.4434, 39.9023, 116.4501, 4.5, 2,
              create_time=8, deadline=60, fee=4.0),  # 大件配送
        Order("O007", 39.9145, 116.4367, 39.9098, 116.4289, 1.0, 3,
              create_time=10, deadline=55, fee=5.5), # 正常配送

        # 远距离订单
        Order("O008", 39.9201, 116.4512, 39.9256, 116.4601, 2.0, 3,
              create_time=12, deadline=70, fee=9.0),  # 远距离
        Order("O009", 39.9056, 116.4321, 39.9012, 116.4256, 1.8, 4,
              create_time=15, deadline=50, fee=6.5), # 中等距离
        Order("O010", 39.9189, 116.4445, 39.9234, 116.4512, 2.2, 2,
              create_time=18, deadline=65, fee=5.0),  # 低紧急
    ]

    # === 配送员数据(8个可用配送员) ===
    couriers = [
        # 核心商圈配送员(近、高评分)
        Courier("C001", 39.9112, 116.4412, speed=25, rating=4.8,
                fatigue=85, orders_today=12, region="A"),
        Courier("C002", 39.9134, 116.4389, speed=22, rating=4.9,
                fatigue=90, orders_today=8, region="A"),
        Courier("C003", 39.9098, 116.4456, speed=28, rating=4.6,
                fatigue=75, orders_today=15, region="A"),

        # 边缘区域配送员
        Courier("C004", 39.9178, 116.4321, speed=20, rating=4.5,
                fatigue=60, orders_today=18, region="B"),
        Courier("C005", 39.9045, 116.4412, speed=24, rating=4.7,
                fatigue=80, orders_today=10, region="C"),

        # 新手/低评分配送员
        Courier("C006", 39.9167, 116.4489, speed=18, rating=4.2,
                fatigue=70, orders_today=6, region="B"),
        Courier("C007", 39.9089, 116.4356, speed=26, rating=4.4,
                fatigue=55, orders_today=20, region="A"),  # 疲劳度高

        # 偏远配送员
        Courier("C008", 39.9234, 116.4512, speed=22, rating=4.6,
                fatigue=88, orders_today=5, region="D"),
    ]

    # === 环境设置(晚高峰+阴天) ===
    env = Environment(
        weather_factor=0.3,   # 阴天,能见度一般
        traffic_factor=0.5    # 晚高峰,中度拥堵
    )

    return orders, couriers, env


def main():
    """主函数"""
    print("=" * 70)
    print(" " * 20 + "配送调度系统 - 全局最优分配")
    print("=" * 70)

    # 创建场景
    orders, couriers, env = create_realistic_scenario()

    # 初始化调度器
    scheduler = DispatchScheduler()
    scheduler.orders = orders
    scheduler.couriers = couriers
    scheduler.set_environment(env.weather_factor, env.traffic_factor)

    # 输出环境信息
    print(f"\n【环境状况】")
    print(f"  天气影响因子: {env.weather_factor:.0%} (阴天)")
    print(f"  交通影响因子: {env.traffic_factor:.0%} (晚高峰)")
    print(f"  配送员数量: {len(couriers)}")
    print(f"  订单数量: {len(orders)}")

    # 输出订单信息
    print(f"\n【待配送订单】")
    print("-" * 70)
    print(f"{'订单ID':<8} {'紧急度':<6} {'配送费':<8} {'时间窗口':<10} {'重量':<6}")
    print("-" * 70)
    for o in sorted(orders, key=lambda x: -x.urgency):
        print(f"{o.order_id:<8} {'*'*o.urgency:<6} {o.fee}元   {o.deadline}分钟   {o.weight}kg")

    # 输出配送员信息
    print(f"\n【可用配送员】")
    print("-" * 70)
    print(f"{'配送员ID':<10} {'位置':<18} {'速度':<8} {'评分':<6} {'疲劳度':<8} {'今日单量'}")
    print("-" * 70)
    for c in couriers:
        loc = f"({c.lat:.4f},{c.lng:.4f})"
        fatigue_bar = "|" * int(c.fatigue/10) + "-" * (10 - int(c.fatigue/10))
        print(f"{c.courier_id:<10} {loc:<18} {c.speed}km/h  {c.rating}  {fatigue_bar} {c.orders_today}单")

    # === 计算所有匹配分 ===
    print(f"\n{'=' * 70}")
    print("【匹配分计算 - 所有订单×配送员】")
    print("=" * 70)

    # 打印表头
    header = f"{'订单':<8}"
    for c in couriers:
        header += f" {c.courier_id:<10}"
    print(header)
    print("-" * 90)

    # 计算并打印矩阵
    score_matrix = []
    for order in orders:
        row = f"{order.order_id:<8}"
        row_scores = []
        for courier in couriers:
            score, details = scheduler.calculate_match_score(order, courier)
            if details['can_deliver']:
                row += f" {score:>8.1f} "
            else:
                row += f" {'N/A':<10}"
            row_scores.append(score if details['can_deliver'] else -1)
        score_matrix.append(row_scores)
        print(row)

    # === 求解全局最优 ===
    print(f"\n{'=' * 70}")
    print("【全局最优分配结果 - 匈牙利算法】")
    print("=" * 70)

    result = scheduler.solve_hungarian()

    # 生成详细报告
    report = scheduler.get_full_report(result)

    print(f"\n{'序号':<4} {'订单ID':<8} {'分配给':<10} {'匹配分':<8} {'取件距离':<10} {'配送距离':<10} {'预计时间':<10}")
    print("-" * 70)

    total_fee = 0
    total_score = 0
    for i, r in enumerate(report, 1):
        d = r['score_breakdown']
        print(f"{i:<4} {r['order_id']:<8} {r['courier_id']:<10} {r['match_score']:>6.1f}   "
              f"{d['pickup_dist']:>5.2f}km   {d['delivery_dist']:>5.2f}km   {d['total_time']:>5.1f}分钟")
        total_fee += r['order_fee']
        total_score += r['match_score']

    # 评分明细
    print(f"\n{'=' * 70}")
    print("【分配结果评分明细】")
    print("=" * 70)

    for r in report:
        d = r['score_breakdown']
        print(f"\n{r['order_id']} -> {r['courier_id']}")
        print(f"  综合评分: {r['match_score']:.1f}")
        print(f"  |-- 距离分({scheduler.WEIGHTS['distance']:.0%}): {d['dist_score']:.1f} x {scheduler.WEIGHTS['distance']:.0%} = {d['dist_score']*scheduler.WEIGHTS['distance']:.2f}")
        print(f"  |-- 时间分({scheduler.WEIGHTS['time']:.0%}): {d['time_score']:.1f} x {scheduler.WEIGHTS['time']:.0%} = {d['time_score']*scheduler.WEIGHTS['time']:.2f}")
        print(f"  |-- 质量分({scheduler.WEIGHTS['courier_quality']:.0%}): {d['rating_score']:.1f} x {scheduler.WEIGHTS['courier_quality']:.0%} = {d['rating_score']*scheduler.WEIGHTS['courier_quality']:.2f}")
        print(f"  |-- 疲劳分({scheduler.WEIGHTS['fatigue']:.0%}): {d['fatigue_score']:.1f} x {scheduler.WEIGHTS['fatigue']:.0%} = {d['fatigue_score']*scheduler.WEIGHTS['fatigue']:.2f}")
        print(f"  '-- 环境分({scheduler.WEIGHTS['environment']:.0%}): {d['env_score']:.1f} x {scheduler.WEIGHTS['environment']:.0%} = {d['env_score']*scheduler.WEIGHTS['environment']:.2f}")

    # 未匹配订单
    matched_orders = set(result.keys())
    unmatched = [o for o in orders if o.order_id not in matched_orders]

    if unmatched:
        print(f"\n{'=' * 70}")
        print("【未能匹配的订单】")
        print("=" * 70)
        for o in unmatched:
            print(f"  {o.order_id}: 紧急度{o.urgency},无法在规定时间内送达")

    # 统计摘要
    print(f"\n{'=' * 70}")
    print("【调度统计】")
    print("=" * 70)
    print(f"  总订单数: {len(orders)}")
    print(f"  成功匹配: {len(result)}")
    print(f"  匹配率: {len(result)/len(orders)*100:.1f}%")
    print(f"  总匹配分: {total_score:.1f}")
    print(f"  平均匹配分: {total_score/len(result) if result else 0:.1f}")
    print(f"  预期配送费: {total_fee:.1f}元")


if __name__ == "__main__":
    main()

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐