1 概览

  • • 痛点:WMS 每次跳转第三方查询耦合强、无法长期保存轨迹;需要统一管理并保存 3 年以供审计/客服查询。

  • • 目标:实现「一次推送、外挂持续跟踪直至签收、三年归档」的可扩展、可观测、可运维系统。

  • • 关键特性:异步解耦、按状态动态轮询(Adaptive Polling)、限流保护、幂等入库、分层存储与归档。

2 总体架构

3 核心设计要点

3.1 接入模式

  • • 首选:WMS 采用 HTTP POST 接入(幂等)。

  • • 鉴权:HMAC 签名或 OAuth2,接口需限流与白名单。

3.2 调度模型(关键)

  • • 任务驱动:每个单号在 shipment_task 中维护 next_check_timecheck_interval_secondsstatusno_update_count

  • • 批量扫描:调度器按 next_check_time <= now() 批量读取(分页/limit),并按 MOD(id, N) 做分片,推入 MQ。

  • • Adaptive Polling:根据状态与最近是否有更新动态调整 check_interval(指数退避,上限 6 小时)。

  • • 停止条件:检测到 DELIVERED(签收),停止轮询并进入归档计划。

3.3 执行策略

  • • 固定大小 Worker 池:Worker 为常驻进程(容器),消费者组并发处理 MQ 任务,避免为每单创建线程。

  • • 限流:按快递公司使用 Redisson RateLimiter 或令牌桶保护对方接口。

  • • 幂等写入:通过 shipment_id + event_time + desc_hash 做唯一约束或写入前存在性判断。

  • • 异常处理:失败重试(指数退避),失败超过阈值进入 DLQ 并告警。

3.4 存储与归档

  • • 热存:最近 90/180 天的轨迹保存在 MySQL(或 NoSQL)便于快速查询。

  • • 冷存:超过 N 天(如 90 天)将原始 payload 与老事件归档到对象存储,主库写入归档路径索引。

  • • 分区/分表courier_event 按月分区以便批量删除/迁移。

3.5 可观测性与告警

  • • 指标:队列深度、处理吞吐、QPS、每家快递的失败率、平均延迟(发货→首次轨迹)、未签收超过阈值数量。

  • • 日志:存储原始请求/响应(至少保留 30—90 天)并脱敏。

  • • 告警:连续 N 次抓取失败、队列长度异常增长、接口被限流。

4 数据模型(示例 DDL)

-- 任务表:负责调度

CREATE TABLE shipment_task (
  id BIGINTPRIMARY KEY AUTO_INCREMENT,
  order_id VARCHAR(64),
  tracking_no VARCHAR(64) NOT NULL,
  courier_code VARCHAR(32),
  status VARCHAR(32) DEFAULT'NEW',
  last_event_time DATETIME,
  last_check_time DATETIME,
  next_check_time DATETIME,
  check_interval_seconds INTDEFAULT1800,
  no_update_count INTDEFAULT0,
  retry_count INTDEFAULT0,
  max_retry INTDEFAULT10,
  delivered BOOLEANDEFAULTFALSE,
  raw_meta JSON,
  created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
  updated_at DATETIME DEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
UNIQUE KEY uq_tracking (tracking_no, courier_code),
  INDEX idx_next_check (next_check_time),
  INDEX idx_status (status)

);

-- 轨迹事件(按月分区)

CREATE TABLE courier_event (
  id BIGINTPRIMARY KEY AUTO_INCREMENT,
  shipment_id BIGINTNOT NULL,
  event_time DATETIME NOT NULL,
  location VARCHAR(128),
  description TEXT,
  status_code VARCHAR(32),
  raw_payload JSON,
  created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
UNIQUE KEY uq_event (shipment_id, event_time, MD5(description(1000)))
);

-- 归档路径记录

CREATE TABLE courier_archive (
  id BIGINTPRIMARY KEY AUTO_INCREMENT,
  shipment_id BIGINTNOT NULL,
  s3_path VARCHAR(255),
  archived_at DATETIME,
  note VARCHAR(255)
);

5 核心代码

5.1 接入 Controller(幂等 + 入队)

@RestController
@RequestMapping("/api/v1/shipments")
publicclassShipmentController {

privatefinal ShipmentTaskService taskService;

@PostMapping
public ResponseEntity<?> accept(@RequestBody ShipmentDto dto, @RequestHeader("X-Signature") String sig) {
    // 1. 验证签名(HMAC)
    if (!AuthUtil.verify(sig, dto)) {
      return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
    }

    // 2. 幂等写入或更新任务表
    ShipmentTasktask= taskService.upsertFromDto(dto);

    // 3. 推送到 MQ(由调度器负责更完整的调度逻辑,此处可简化为立即推一次)
    taskService.enqueueImmediateCheck(task.getId());

    return ResponseEntity.accepted().body(Map.of("shipmentId", task.getId()));
  }
}

5.2 调度器(批量扫描 & 分片)

@Component
publicclassScheduler {
@Autowired
private ShipmentTaskRepo repo;
@Autowired
private MQProducer mqProducer;


@Scheduled(cron = "0 * * * * *")// 每分钟触发一次
publicvoidscanAndEnqueue() {
    LocalDateTimenow= LocalDateTime.now();
    intshardCount= Integer.parseInt(env.getProperty("app.shard.count", "4"));
    intmyShard= Integer.parseInt(env.getProperty("app.shard.index", "0"));


    List<ShipmentTask> due = repo.findDueTasks(now, myShard, shardCount, 1000);
    for (ShipmentTask t : due) {
    mqProducer.send("tracking_query_topic", t.getId());
    // update next_check_time optimistically to avoid重复选中
    t.setNextCheckTime(now.plusSeconds(t.getCheckIntervalSeconds()));
    repo.save(t);
    }
  }
}

findDueTasks SQL 示例:

SELECT * FROM shipment_task
WHERE next_check_time <= :now
AND status IN ('NEW','IN_TRANSIT','PENDING_UPDATE')
AND MOD(id, :shardCount) = :shardIndex
ORDER BY next_check_time
LIMIT :limit

5.3 Worker(MQ 消费者)

@RocketMQMessageListener(topic = "tracking_query_topic", consumerGroup = "tracking_fetchers")
publicclassTrackingFetcherimplementsRocketMQListener<Long> {
@Autowired
private ShipmentTaskRepo repo;
@Autowired
private CourierAdapterFactory adapterFactory;
@Autowired
private RedissonClient redisson;


@Override
publicvoidonMessage(Long taskId) {
ShipmentTasktask= repo.findById(taskId).orElse(null);
if (task == null) return;


// rate limit key per courier
RRateLimiterlimiter= redisson.getRateLimiter("courier:" + task.getCourierCode());
if (!limiter.tryAcquire()) {
    // 未获取到令牌,延迟重试
    task.setNextCheckTime(LocalDateTime.now().plusSeconds(30));
    repo.save(task);
    return;
  }


CourierClientclient= adapterFactory.getClient(task.getCourierCode());
try {
    TrackingResponseresp= client.query(task.getTrackingNo());
    List<Event> events = client.parse(resp);
    booleanchanged= persistEvents(task, events, resp);


    if (resp.isDelivered()) {
      task.setStatus("DELIVERED");
      task.setDelivered(true);
      task.setNextCheckTime(null);
    } else {
      adjustIntervalAfterCheck(task, changed);
    }
    } catch (Exception ex) {
      task.setRetryCount(task.getRetryCount() + 1);
      if (task.getRetryCount() > task.getMaxRetry()) task.setStatus("EXCEPTION");
      task.setNextCheckTime(LocalDateTime.now().plusSeconds(60));
    } finally {
      repo.save(task);
    }
  }
}

5.4 适配器接口(扩展快递方)

public interfaceCourierClient {
    TrackingResponse query(String trackingNo)throws IOException;
    List<Event> parse(TrackingResponse resp);
}


publicclassSFExpressClientimplementsCourierClient {
    public TrackingResponse query(String trackingNo) { /* HTTP 调用 SF API */ }
    public List<Event> parse(TrackingResponse resp) { /* 解析 SF 返回 */ }
}

6 性能估算与容量规划

输入参数

  • • 年度出库单量:1,000,000 单/年

  • • 单均轨迹事件数:10 条

  • • 平均事件 JSON 大小:600 字节

  • • 保留期限:3 年

存储估算

  • • 每单事件数据:10 * 600 = 6,000 bytes ≈ 6KB

  • • 年度数据:1,000,000 * 6KB = 6,000,000 KB ≈ 5.72 GB

  • • 3 年数据 ≈ 17.2 GB(不含索引和原始 payload)

  • • 建议预留 3×—5× 因为索引/原始 payload/日志:建议 100GB 存储预算

吞吐估算(高峰)

  • • 假设同时在途 100k 单号,平均每小时查一次 → 每小时 100k 次 → 每秒 ~28 次。

  • • Worker 节点:每节点 200 并发可以轻松处理,故 1-3 节点即可。

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐