# WhatsApp 营销技术实践:从 0 搭建可量化的私域触达系统
本文面向有 Python/SQL 基础的技术与运营同学,分享如何基于 WhatsApp Business API 搭建一套可观测、可回滚的批量营销系统。
一、问题定义:为什么需要工程化方案
某跨境电商团队起初用人工客服在 WhatsApp 上发促销消息。用户量破万后,暴露出三个工程问题:
- 无法规模化:人工发 1 万条消息耗时数天,且容易出错。
- 不可归因:不知道哪条消息带来了复购订单。
- 无风险控制:发送频率、文案质量、账号状态缺乏统一监控。
团队决定自建一层营销中台,核心目标:
- 日发送量 ≥ 5 万条
- 消息级点击归因
- 账号投诉率 < 0.3%
二、整体技术架构
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ 订单/用户表 │────▶│ 用户分层服务 │────▶ │ 消息模板引擎 │
│ (MySQL) │ │ (Python) │ │ (Jinja2) │
└─────────────┘ └──────────────┘ └─────────────────┘
│
┌─────────────┐ ┌──────────────┐ │
│ 效果报表 │◀────│ 回调处理器 │◀────────── ┘
│ (BI/Metabase)│ │ (Webhook) │
└─────────────┘ └──────────────┘
核心链路:数据层 → 人群圈选 → 模板渲染 → 批量发送 → Webhook 回执 → 数据仓库归因。
三、数据层:用户标签与订单表设计
-- 用户主表
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
phone VARCHAR(20) NOT NULL,
country_code VARCHAR(5),
segment VARCHAR(32), -- 'high_value', 'churn_risk', 'new_user'
wa_opt_in BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP
);
-- 消息发送记录表
CREATE TABLE wa_campaign_logs (
log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT,
campaign_id VARCHAR(32),
template_name VARCHAR(64),
rendered_message TEXT,
sent_at TIMESTAMP,
status VARCHAR(16), -- 'sent', 'delivered', 'read', 'failed'
message_id VARCHAR(64),
INDEX idx_campaign_sent (campaign_id, sent_at)
);
-- 订单归因表
CREATE TABLE wa_attributions (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
campaign_id VARCHAR(32),
message_id VARCHAR(64),
attributed_revenue DECIMAL(10,2),
attributed_at TIMESTAMP
);
四、人群圈选:用 SQL 做分层
以“高价值沉默用户”为例:
SELECT u.user_id, u.phone, u.country_code, MAX(o.order_date) AS last_order_date
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE u.segment = 'high_value'
AND u.wa_opt_in = TRUE
AND o.order_date BETWEEN DATE_SUB(NOW(), INTERVAL 90 DAY) AND DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY u.user_id, u.phone, u.country_code;
关键原则:把“发给谁”的决策交给 SQL,而不是人工选名单。
五、模板引擎:Jinja2 + 变量校验
用 Jinja2 渲染带变量的消息模板,避免拼接字符串导致的安全和格式问题。
from jinja2 import Template
template_str = "Hi {{ name }}, your {{ product }} is back in stock. Grab it: {{ link }}"
template = Template(template_str)
payload = {
"name": "Alice",
"product": "Pro Plan",
"link": "https://shop.example.com/restock?u=abc123"
}
message = template.render(payload)
增加一层校验,防止变量缺失导致发送失败:
required_vars = {"name", "product", "link"}
missing = required_vars - payload.keys()
if missing:
raise ValueError(f"Missing template vars: {missing}")
六、发送层:基于 WhatsApp Business API 的批量脚本
下面是简化的发送脚本骨架。生产环境中会加上限流、重试和账号轮换。
import requests
import time
from typing import List, Dict
WHATSAPP_API_URL = "https://graph.facebook.com/v18.0/{phone_number_id}/messages"
ACCESS_TOKEN = "YOUR_ACCESS_TOKEN"
def send_template_message(phone: str, template_name: str, language: str = "en") -> Dict:
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": phone,
"type": "template",
"template": {
"name": template_name,
"language": {"code": language}
}
}
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json"
}
resp = requests.post(WHATSAPP_API_URL, json=payload, headers=headers)
return resp.json()
def batch_send(users: List[Dict], template_name: str, qps: int = 2):
interval = 1.0 / qps
for user in users:
try:
result = send_template_message(user["phone"], template_name)
print(f"Sent to {user['phone']}: {result.get('messages', [{}])[0].get('id')}")
except Exception as e:
print(f"Failed to send to {user['phone']}: {e}")
time.sleep(interval)
注意:直接调 API 适合有开发资源的团队。如果希望降低维护成本,也可以选择市面上成熟的第三方方案。
七、Webhook 回调:状态回写与风控
配置 Webhook 接收 messages 和 message_status 事件:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/webhook/whatsapp", methods=["POST"])
def whatsapp_webhook():
data = request.json
for entry in data.get("entry", []):
for change in entry.get("changes", []):
value = change.get("value", {})
# 消息状态回执
statuses = value.get("statuses", [])
for status in statuses:
update_message_status(
message_id=status["id"],
status=status["status"], # sent/delivered/read/failed
timestamp=status["timestamp"]
)
# 用户回复内容
messages = value.get("messages", [])
for msg in messages:
store_inbound_message(
phone=msg["from"],
text=msg.get("text", {}).get("body", "")
)
return jsonify({"status": "ok"})
def update_message_status(message_id: str, status: str, timestamp: int):
# 更新 wa_campaign_logs 表
sql = """
UPDATE wa_campaign_logs
SET status = %s, updated_at = FROM_UNIXTIME(%s)
WHERE message_id = %s
"""
execute_sql(sql, (status, timestamp, message_id))
基于状态数据,可以实时监控送达率、阅读率和失败率。
八、归因:如何把订单回追到某条消息
在消息中带上带 UTM 参数的短链:
def build_tracking_link(user_id: int, campaign_id: str) -> str:
return f"https://shop.example.com/offer?utm_source=whatsapp&utm_campaign={campaign_id}&u={user_id}"
订单落库后,用 SQL 做归因窗口分析(默认 7 天点击归因):
SELECT
c.campaign_id,
COUNT(DISTINCT a.order_id) AS attributed_orders,
SUM(a.attributed_revenue) AS attributed_revenue,
ROUND(SUM(a.attributed_revenue) / COUNT(DISTINCT l.log_id) * 1000, 2) AS rpm
FROM wa_campaign_logs l
JOIN wa_campaigns c ON l.campaign_id = c.campaign_id
LEFT JOIN wa_attributions a ON l.message_id = a.message_id
AND a.attributed_at BETWEEN l.sent_at AND DATE_ADD(l.sent_at, INTERVAL 7 DAY)
WHERE c.sent_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY c.campaign_id
ORDER BY attributed_revenue DESC;
九、实际运行效果
运行 3 个月后,核心指标如下:
| 指标 | 基线 | 优化后 |
|---|---|---|
| 日发送量 | 800 条/人/天 | 5 万条/天 |
| 消息打开率 | 人工不可统计 | 71% |
| 7 日复购转化率 | 1.2% | 8.4% |
| 账号投诉率 | 0.8% | 0.18% |
投诉率下降的关键是:分层 + 错峰发送 + 失败/退订自动熔断。
十、踩坑与建议
- 模板预审核:WhatsApp Business API 的模板需提前提交 Meta 审核,文案避免促销敏感词。
- 限流与封号:新账号先养号,初始日发送量控制在 1,000 条以内,逐步提升。
- 时区发送:根据
country_code推断时区,避免半夜打扰用户。 - 退订处理:用户回复 STOP 后,必须立刻写入黑名单并停止发送。
十一、何时该用第三方工具
如果你的团队没有专职后端开发,维护 WhatsApp API、Webhook、账号轮换的成本会很高。这种情况下,可以考虑把发送层交给成熟的群发工具,自己只保留数据分层和归因部分。
我们在对比几款方案时,注意到 WASender 这类工具在变量模板、多账号轮询和分时段发送上做得比较扎实,适合想快速跑通 WhatsApp 营销闭环的团队。
免责声明:文中代码为简化示例,生产环境请补充认证、限流、日志和异常处理;案例数据已脱敏,仅供学习参考。
更多推荐



所有评论(0)