快递轨迹外挂组件设计方案
• 指标:队列深度、处理吞吐、QPS、每家快递的失败率、平均延迟(发货→首次轨迹)、未签收超过阈值数量。:异步解耦、按状态动态轮询(Adaptive Polling)、限流保护、幂等入库、分层存储与归档。:超过 N 天(如 90 天)将原始 payload 与老事件归档到对象存储,主库写入归档路径索引。• 年度数据:1,000,000 * 6KB = 6,000,000 KB ≈ 5.72 GB。
1 概览
-
• 痛点:WMS 每次跳转第三方查询耦合强、无法长期保存轨迹;需要统一管理并保存 3 年以供审计/客服查询。
-
• 目标:实现「一次推送、外挂持续跟踪直至签收、三年归档」的可扩展、可观测、可运维系统。
-
• 关键特性:异步解耦、按状态动态轮询(Adaptive Polling)、限流保护、幂等入库、分层存储与归档。
2 总体架构

3 核心设计要点
3.1 接入模式
-
• 首选:WMS 采用 HTTP POST 接入(幂等)。
-
• 鉴权:HMAC 签名或 OAuth2,接口需限流与白名单。
3.2 调度模型(关键)
-
• 任务驱动:每个单号在
shipment_task中维护next_check_time、check_interval_seconds、status、no_update_count。 -
• 批量扫描:调度器按
next_check_time <= now()批量读取(分页/limit),并按MOD(id, N)做分片,推入 MQ。 -
• Adaptive Polling:根据状态与最近是否有更新动态调整
check_interval(指数退避,上限 6 小时)。 -
• 停止条件:检测到
DELIVERED(签收),停止轮询并进入归档计划。
3.3 执行策略
-
• 固定大小 Worker 池:Worker 为常驻进程(容器),消费者组并发处理 MQ 任务,避免为每单创建线程。
-
• 限流:按快递公司使用 Redisson RateLimiter 或令牌桶保护对方接口。
-
• 幂等写入:通过
shipment_id + event_time + desc_hash做唯一约束或写入前存在性判断。 -
• 异常处理:失败重试(指数退避),失败超过阈值进入 DLQ 并告警。
3.4 存储与归档
-
• 热存:最近 90/180 天的轨迹保存在 MySQL(或 NoSQL)便于快速查询。
-
• 冷存:超过 N 天(如 90 天)将原始 payload 与老事件归档到对象存储,主库写入归档路径索引。
-
• 分区/分表:
courier_event按月分区以便批量删除/迁移。
3.5 可观测性与告警
-
• 指标:队列深度、处理吞吐、QPS、每家快递的失败率、平均延迟(发货→首次轨迹)、未签收超过阈值数量。
-
• 日志:存储原始请求/响应(至少保留 30—90 天)并脱敏。
-
• 告警:连续 N 次抓取失败、队列长度异常增长、接口被限流。
4 数据模型(示例 DDL)
-- 任务表:负责调度
CREATE TABLE shipment_task (
id BIGINTPRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64),
tracking_no VARCHAR(64) NOT NULL,
courier_code VARCHAR(32),
status VARCHAR(32) DEFAULT'NEW',
last_event_time DATETIME,
last_check_time DATETIME,
next_check_time DATETIME,
check_interval_seconds INTDEFAULT1800,
no_update_count INTDEFAULT0,
retry_count INTDEFAULT0,
max_retry INTDEFAULT10,
delivered BOOLEANDEFAULTFALSE,
raw_meta JSON,
created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
updated_at DATETIME DEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
UNIQUE KEY uq_tracking (tracking_no, courier_code),
INDEX idx_next_check (next_check_time),
INDEX idx_status (status)
);
-- 轨迹事件(按月分区)
CREATE TABLE courier_event (
id BIGINTPRIMARY KEY AUTO_INCREMENT,
shipment_id BIGINTNOT NULL,
event_time DATETIME NOT NULL,
location VARCHAR(128),
description TEXT,
status_code VARCHAR(32),
raw_payload JSON,
created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
UNIQUE KEY uq_event (shipment_id, event_time, MD5(description(1000)))
);
-- 归档路径记录
CREATE TABLE courier_archive (
id BIGINTPRIMARY KEY AUTO_INCREMENT,
shipment_id BIGINTNOT NULL,
s3_path VARCHAR(255),
archived_at DATETIME,
note VARCHAR(255)
);
5 核心代码
5.1 接入 Controller(幂等 + 入队)
@RestController
@RequestMapping("/api/v1/shipments")
publicclassShipmentController {
privatefinal ShipmentTaskService taskService;
@PostMapping
public ResponseEntity<?> accept(@RequestBody ShipmentDto dto, @RequestHeader("X-Signature") String sig) {
// 1. 验证签名(HMAC)
if (!AuthUtil.verify(sig, dto)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}
// 2. 幂等写入或更新任务表
ShipmentTasktask= taskService.upsertFromDto(dto);
// 3. 推送到 MQ(由调度器负责更完整的调度逻辑,此处可简化为立即推一次)
taskService.enqueueImmediateCheck(task.getId());
return ResponseEntity.accepted().body(Map.of("shipmentId", task.getId()));
}
}
5.2 调度器(批量扫描 & 分片)
@Component
publicclassScheduler {
@Autowired
private ShipmentTaskRepo repo;
@Autowired
private MQProducer mqProducer;
@Scheduled(cron = "0 * * * * *")// 每分钟触发一次
publicvoidscanAndEnqueue() {
LocalDateTimenow= LocalDateTime.now();
intshardCount= Integer.parseInt(env.getProperty("app.shard.count", "4"));
intmyShard= Integer.parseInt(env.getProperty("app.shard.index", "0"));
List<ShipmentTask> due = repo.findDueTasks(now, myShard, shardCount, 1000);
for (ShipmentTask t : due) {
mqProducer.send("tracking_query_topic", t.getId());
// update next_check_time optimistically to avoid重复选中
t.setNextCheckTime(now.plusSeconds(t.getCheckIntervalSeconds()));
repo.save(t);
}
}
}
findDueTasks SQL 示例:
SELECT * FROM shipment_task
WHERE next_check_time <= :now
AND status IN ('NEW','IN_TRANSIT','PENDING_UPDATE')
AND MOD(id, :shardCount) = :shardIndex
ORDER BY next_check_time
LIMIT :limit
5.3 Worker(MQ 消费者)
@RocketMQMessageListener(topic = "tracking_query_topic", consumerGroup = "tracking_fetchers")
publicclassTrackingFetcherimplementsRocketMQListener<Long> {
@Autowired
private ShipmentTaskRepo repo;
@Autowired
private CourierAdapterFactory adapterFactory;
@Autowired
private RedissonClient redisson;
@Override
publicvoidonMessage(Long taskId) {
ShipmentTasktask= repo.findById(taskId).orElse(null);
if (task == null) return;
// rate limit key per courier
RRateLimiterlimiter= redisson.getRateLimiter("courier:" + task.getCourierCode());
if (!limiter.tryAcquire()) {
// 未获取到令牌,延迟重试
task.setNextCheckTime(LocalDateTime.now().plusSeconds(30));
repo.save(task);
return;
}
CourierClientclient= adapterFactory.getClient(task.getCourierCode());
try {
TrackingResponseresp= client.query(task.getTrackingNo());
List<Event> events = client.parse(resp);
booleanchanged= persistEvents(task, events, resp);
if (resp.isDelivered()) {
task.setStatus("DELIVERED");
task.setDelivered(true);
task.setNextCheckTime(null);
} else {
adjustIntervalAfterCheck(task, changed);
}
} catch (Exception ex) {
task.setRetryCount(task.getRetryCount() + 1);
if (task.getRetryCount() > task.getMaxRetry()) task.setStatus("EXCEPTION");
task.setNextCheckTime(LocalDateTime.now().plusSeconds(60));
} finally {
repo.save(task);
}
}
}
5.4 适配器接口(扩展快递方)
public interfaceCourierClient {
TrackingResponse query(String trackingNo)throws IOException;
List<Event> parse(TrackingResponse resp);
}
publicclassSFExpressClientimplementsCourierClient {
public TrackingResponse query(String trackingNo) { /* HTTP 调用 SF API */ }
public List<Event> parse(TrackingResponse resp) { /* 解析 SF 返回 */ }
}
6 性能估算与容量规划
输入参数
-
• 年度出库单量:1,000,000 单/年
-
• 单均轨迹事件数:10 条
-
• 平均事件 JSON 大小:600 字节
-
• 保留期限:3 年
存储估算
-
• 每单事件数据:10 * 600 = 6,000 bytes ≈ 6KB
-
• 年度数据:1,000,000 * 6KB = 6,000,000 KB ≈ 5.72 GB
-
• 3 年数据 ≈ 17.2 GB(不含索引和原始 payload)
-
• 建议预留 3×—5× 因为索引/原始 payload/日志:建议 100GB 存储预算
吞吐估算(高峰)
-
• 假设同时在途 100k 单号,平均每小时查一次 → 每小时 100k 次 → 每秒 ~28 次。
-
• Worker 节点:每节点 200 并发可以轻松处理,故 1-3 节点即可。
更多推荐




所有评论(0)