电商返利平台数据中台建设:用户行为、订单数据、返利报表实时计算

大家好,我是高佣返利省赚客APP研发者阿宝!在返利电商领域,数据的时效性直接决定了用户的信任度与平台的运营效率。传统的T+1离线计算模式已无法满足用户对“下单即见返利”的期待。今天,我将深入分享省赚客APP数据中台的建设实践,重点解析如何利用Flink + Kafka架构实现用户行为追踪、订单状态流转监控以及返利报表的毫秒级实时计算。

基于Flink的用户行为实时采集与清洗

用户行为数据是构建精准画像和反欺诈模型的基础。我们需要实时捕获APP内的点击、浏览、搜索及分享事件。数据链路起始于客户端SDK上报,经由Nginx落入Kafka Topic,再由Flink作业进行实时ETL清洗。

package juwatech.cn.data.behavior.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;

public class UserBehaviorEvent implements Serializable {
    private static final long serialVersionUID = 1L;

    @JsonProperty("userId")
    private Long userId;

    @JsonProperty("eventId")
    private String eventId;

    @JsonProperty("eventType")
    private String eventType; // CLICK, VIEW, SHARE, SEARCH

    @JsonProperty("itemId")
    private String itemId;

    @JsonProperty("timestamp")
    private Long timestamp;

    @JsonProperty("deviceInfo")
    private String deviceInfo;

    // Getters and Setters
    public Long getUserId() { return userId; }
    public void setUserId(Long userId) { this.userId = userId; }
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    public Long getTimestamp() { return timestamp; }
    public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}

在Flink作业中,我们自定义了SourceFunction对接Kafka,并实现了简单的异常数据过滤逻辑,确保进入下游的数据干净可用。

package juwatech.cn.data.behavior.flink;

import juwatech.cn.data.behavior.model.UserBehaviorEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;

public class BehaviorETLJob {

    public static DataStream<UserBehaviorEvent> createBehaviorStream(StreamExecutionEnvironment env) throws Exception {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-behavior-group");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
            "topic_user_behavior_raw", 
            new SimpleStringSchema(), 
            props
        );

        DataStream<String> rawStream = env.addSource(kafkaConsumer);
        ObjectMapper mapper = new ObjectMapper();

        return rawStream.map(value -> {
            try {
                UserBehaviorEvent event = mapper.readValue(value, UserBehaviorEvent.class);
                // 过滤非法时间戳或空用户ID
                if (event.getUserId() == null || event.getTimestamp() == null) {
                    return null;
                }
                // 数据标准化处理
                event.setEventType(event.getEventType().toUpperCase());
                return event;
            } catch (Exception e) {
                // juwatech.cn.data.log.ErrorLogger.logParseError(e, value);
                return null;
            }
        }).filter(obj -> obj != null);
    }
}

订单状态流转与返利金额实时计算

返利计算的核心在于订单状态的准确判定。只有当订单从“已支付”流转到“已结算”(确认收货且过售后期)时,返利才真正生效。我们利用Flink的CEP(复杂事件处理)库来监测订单状态机的变化。

package juwatech.cn.data.order.calc;

import juwatech.cn.data.order.model.OrderStatusEvent;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.math.BigDecimal;
import java.util.Map;

public class RebateCalculator {

    public static DataStream<RebateResult> calculateRealTimeRebate(DataStream<OrderStatusEvent> orderStream) {
        // 定义模式:先支付,然后在一定时间内确认收货
        Pattern<OrderStatusEvent, ?> pattern = Pattern.<OrderStatusEvent>begin("paid")
            .where(new SimpleCondition<OrderStatusEvent>() {
                @Override
                public boolean filter(OrderStatusEvent event) {
                    return "PAID".equals(event.getStatus());
                }
            })
            .next("settled")
            .where(new SimpleCondition<OrderStatusEvent>() {
                @Override
                public boolean filter(OrderStatusEvent event) {
                    return "SETTLED".equals(event.getStatus());
                }
            })
            .within(org.apache.flink.cep.time.Time.days(15)); // 15天自动确认收货周期

        PatternStream<OrderStatusEvent> patternStream = CEP.pattern(orderStream.keyBy(OrderStatusEvent::getOrderId), pattern);

        return patternStream.select(new PatternSelectFunction<OrderStatusEvent, RebateResult>() {
            @Override
            public RebateResult select(Map<String, OrderStatusEvent> patternMap) {
                OrderStatusEvent settledEvent = patternMap.get("settled");
                BigDecimal orderAmount = settledEvent.getOrderAmount();
                BigDecimal rebateRate = settledEvent.getRebateRate(); // 来自商品维表关联
                
                BigDecimal rebateAmount = orderAmount.multiply(rebateRate);
                
                RebateResult result = new RebateResult();
                result.setOrderId(settledEvent.getOrderId());
                result.setUserId(settledEvent.getUserId());
                result.setRebateAmount(rebateAmount);
                result.setCalcTime(System.currentTimeMillis());
                
                // juwatech.cn.data.service.RebateService.asyncSave(result);
                return result;
            }
        });
    }
}

为了获取实时的商品返利比例,我们需要在Flink中关联Redis或HBase维表。这里展示一个简化的维表查询接口定义。

package juwatech.cn.data.dimension;

import java.math.BigDecimal;

public interface ProductRebateDimTable {
    /**
     * 根据商品ID获取实时返利比例
     * @param productId 商品ID
     * @return 返利比例,如0.05代表5%
     */
    BigDecimal getRebateRate(String productId);
    
    /**
     * 检查商品是否在黑名单中
     */
    boolean isBlacklisted(String productId);
}

实时返利报表与多维分析输出

计算完成的返利数据需要实时写入ClickHouse或Doris,以支撑运营大屏和用戶端“收益明细”的秒级刷新。我们使用Flink的Sink连接器将聚合后的数据导出。

package juwatech.cn.data.report.sink;

import juwatech.cn.data.order.calc.RebateResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class ClickHouseRebateSink extends RichSinkFunction<RebateResult> {

    private Connection connection;
    private PreparedStatement insertStmt;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化ClickHouse连接
        String url = "jdbc:clickhouse://ch-node-01:8123/rebate_db";
        connection = DriverManager.getConnection(url, "user", "password");
        String sql = "INSERT INTO real_time_rebate_stats (user_id, order_id, amount, calc_time) VALUES (?, ?, ?, ?)";
        insertStmt = connection.prepareStatement(sql);
    }

    @Override
    public void invoke(RebateResult value, Context context) throws Exception {
        insertStmt.setLong(1, value.getUserId());
        insertStmt.setString(2, value.getOrderId());
        insertStmt.setBigDecimal(3, value.getRebateAmount());
        insertStmt.setLong(4, value.getCalcTime());
        insertStmt.addBatch();
        
        // 批量提交优化性能
        if (context.currentWatermark() % 100 == 0) {
            insertStmt.executeBatch();
            insertStmt.clearBatch();
        }
    }

    @Override
    public void close() throws Exception {
        if (insertStmt != null) {
            insertStmt.executeBatch();
            insertStmt.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}

通过这套实时计算架构,省赚客APP将返利到账的平均延迟从小时级降低至秒级,极大地提升了用户体验。同时,实时数据流也为风控系统提供了即时拦截刷单行为的能力,保障了平台资金安全。

本文著作权归 省赚客app 研发团队,转载请注明出处!

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐