电商返利平台数据中台建设:用户行为、订单数据、返利报表实时计算
在返利电商领域,数据的时效性直接决定了用户的信任度与平台的运营效率。今天,我将深入分享省赚客APP数据中台的建设实践,重点解析如何利用Flink + Kafka架构实现用户行为追踪、订单状态流转监控以及返利报表的毫秒级实时计算。返利计算的核心在于订单状态的准确判定。通过这套实时计算架构,省赚客APP将返利到账的平均延迟从小时级降低至秒级,极大地提升了用户体验。在Flink作业中,我们自定义了Sou
电商返利平台数据中台建设:用户行为、订单数据、返利报表实时计算
大家好,我是高佣返利省赚客APP研发者阿宝!在返利电商领域,数据的时效性直接决定了用户的信任度与平台的运营效率。传统的T+1离线计算模式已无法满足用户对“下单即见返利”的期待。今天,我将深入分享省赚客APP数据中台的建设实践,重点解析如何利用Flink + Kafka架构实现用户行为追踪、订单状态流转监控以及返利报表的毫秒级实时计算。
基于Flink的用户行为实时采集与清洗
用户行为数据是构建精准画像和反欺诈模型的基础。我们需要实时捕获APP内的点击、浏览、搜索及分享事件。数据链路起始于客户端SDK上报,经由Nginx落入Kafka Topic,再由Flink作业进行实时ETL清洗。
package juwatech.cn.data.behavior.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
public class UserBehaviorEvent implements Serializable {
private static final long serialVersionUID = 1L;
@JsonProperty("userId")
private Long userId;
@JsonProperty("eventId")
private String eventId;
@JsonProperty("eventType")
private String eventType; // CLICK, VIEW, SHARE, SEARCH
@JsonProperty("itemId")
private String itemId;
@JsonProperty("timestamp")
private Long timestamp;
@JsonProperty("deviceInfo")
private String deviceInfo;
// Getters and Setters
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}
在Flink作业中,我们自定义了SourceFunction对接Kafka,并实现了简单的异常数据过滤逻辑,确保进入下游的数据干净可用。
package juwatech.cn.data.behavior.flink;
import juwatech.cn.data.behavior.model.UserBehaviorEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
public class BehaviorETLJob {
public static DataStream<UserBehaviorEvent> createBehaviorStream(StreamExecutionEnvironment env) throws Exception {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-behavior-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"topic_user_behavior_raw",
new SimpleStringSchema(),
props
);
DataStream<String> rawStream = env.addSource(kafkaConsumer);
ObjectMapper mapper = new ObjectMapper();
return rawStream.map(value -> {
try {
UserBehaviorEvent event = mapper.readValue(value, UserBehaviorEvent.class);
// 过滤非法时间戳或空用户ID
if (event.getUserId() == null || event.getTimestamp() == null) {
return null;
}
// 数据标准化处理
event.setEventType(event.getEventType().toUpperCase());
return event;
} catch (Exception e) {
// juwatech.cn.data.log.ErrorLogger.logParseError(e, value);
return null;
}
}).filter(obj -> obj != null);
}
}
订单状态流转与返利金额实时计算
返利计算的核心在于订单状态的准确判定。只有当订单从“已支付”流转到“已结算”(确认收货且过售后期)时,返利才真正生效。我们利用Flink的CEP(复杂事件处理)库来监测订单状态机的变化。
package juwatech.cn.data.order.calc;
import juwatech.cn.data.order.model.OrderStatusEvent;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.math.BigDecimal;
import java.util.Map;
public class RebateCalculator {
public static DataStream<RebateResult> calculateRealTimeRebate(DataStream<OrderStatusEvent> orderStream) {
// 定义模式:先支付,然后在一定时间内确认收货
Pattern<OrderStatusEvent, ?> pattern = Pattern.<OrderStatusEvent>begin("paid")
.where(new SimpleCondition<OrderStatusEvent>() {
@Override
public boolean filter(OrderStatusEvent event) {
return "PAID".equals(event.getStatus());
}
})
.next("settled")
.where(new SimpleCondition<OrderStatusEvent>() {
@Override
public boolean filter(OrderStatusEvent event) {
return "SETTLED".equals(event.getStatus());
}
})
.within(org.apache.flink.cep.time.Time.days(15)); // 15天自动确认收货周期
PatternStream<OrderStatusEvent> patternStream = CEP.pattern(orderStream.keyBy(OrderStatusEvent::getOrderId), pattern);
return patternStream.select(new PatternSelectFunction<OrderStatusEvent, RebateResult>() {
@Override
public RebateResult select(Map<String, OrderStatusEvent> patternMap) {
OrderStatusEvent settledEvent = patternMap.get("settled");
BigDecimal orderAmount = settledEvent.getOrderAmount();
BigDecimal rebateRate = settledEvent.getRebateRate(); // 来自商品维表关联
BigDecimal rebateAmount = orderAmount.multiply(rebateRate);
RebateResult result = new RebateResult();
result.setOrderId(settledEvent.getOrderId());
result.setUserId(settledEvent.getUserId());
result.setRebateAmount(rebateAmount);
result.setCalcTime(System.currentTimeMillis());
// juwatech.cn.data.service.RebateService.asyncSave(result);
return result;
}
});
}
}
为了获取实时的商品返利比例,我们需要在Flink中关联Redis或HBase维表。这里展示一个简化的维表查询接口定义。
package juwatech.cn.data.dimension;
import java.math.BigDecimal;
public interface ProductRebateDimTable {
/**
* 根据商品ID获取实时返利比例
* @param productId 商品ID
* @return 返利比例,如0.05代表5%
*/
BigDecimal getRebateRate(String productId);
/**
* 检查商品是否在黑名单中
*/
boolean isBlacklisted(String productId);
}
实时返利报表与多维分析输出
计算完成的返利数据需要实时写入ClickHouse或Doris,以支撑运营大屏和用戶端“收益明细”的秒级刷新。我们使用Flink的Sink连接器将聚合后的数据导出。
package juwatech.cn.data.report.sink;
import juwatech.cn.data.order.calc.RebateResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class ClickHouseRebateSink extends RichSinkFunction<RebateResult> {
private Connection connection;
private PreparedStatement insertStmt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化ClickHouse连接
String url = "jdbc:clickhouse://ch-node-01:8123/rebate_db";
connection = DriverManager.getConnection(url, "user", "password");
String sql = "INSERT INTO real_time_rebate_stats (user_id, order_id, amount, calc_time) VALUES (?, ?, ?, ?)";
insertStmt = connection.prepareStatement(sql);
}
@Override
public void invoke(RebateResult value, Context context) throws Exception {
insertStmt.setLong(1, value.getUserId());
insertStmt.setString(2, value.getOrderId());
insertStmt.setBigDecimal(3, value.getRebateAmount());
insertStmt.setLong(4, value.getCalcTime());
insertStmt.addBatch();
// 批量提交优化性能
if (context.currentWatermark() % 100 == 0) {
insertStmt.executeBatch();
insertStmt.clearBatch();
}
}
@Override
public void close() throws Exception {
if (insertStmt != null) {
insertStmt.executeBatch();
insertStmt.close();
}
if (connection != null) {
connection.close();
}
super.close();
}
}
通过这套实时计算架构,省赚客APP将返利到账的平均延迟从小时级降低至秒级,极大地提升了用户体验。同时,实时数据流也为风控系统提供了即时拦截刷单行为的能力,保障了平台资金安全。
本文著作权归 省赚客app 研发团队,转载请注明出处!
更多推荐

所有评论(0)