---
title: 电商大促,「网站实时成交额」仪表大盘技术方案?
---
# 电商大促,「网站实时成交额」仪表大盘技术方案?
> 作者:Tom哥
>
公众号:微观技术
>
博客:[https://offercome.cn](https://offercome.cn)
>
人生理念:知道的越多,不知道的越多,努力去学
小T 同学前段时间入职一家电商公司,正赶上公司在搞一场大型促销活动,整个活动周期是 6月18日一整天,老板想看下网站的实时 GMV 和 客单价。
听说小T搞过大数据,于是把这个工作安排给了小 T。
如果你是小T, 该如何完成这项工作呢?
这是阿里每年双十一 的实时交易大屏,给人一种收获的感觉,也是向老板展示这一年成果的时刻。
离线计算能满足对大量数据 进行复杂的批量计算 ,但是要求数据在计算之前已经固定,不再发生变化。而且时间周期比较长,一般都是T+1模式
> 将 MySQL 数据库中前一天订单数据提取到离线仓库,然后通过 Hive SQL 执行一条跑批任务语句,即可计算出结果
实现起来确实很简单,但不满足老板的要求
既然要求`实时` ,那就不能采用`离线数仓` 玩法,我们需要换个思路
## 方案一(业务架构型)
可以采用常规业务架构 思路,通过 Kafka 完成系统解耦,通过开发消费任务 计算网站的交易额和客单价。
整个流程如下图所示,作为一个有多年 **CRUD** 开发经验 Boy,甚至都不用做技术方案,直接撸代码
统计任务可以用我们熟悉的 Java 代码来开发,其实就是一个普通的 MQ 异步消费任务。唯一有点挑战难度的就是统计去重后的下单客户总数
有的同学一拍脑袋,那还不简单,`Set 集合` 就可以呀
互联网的用户体量一般很大,如果上亿的数据量,Set 集合 并不友好
我们可以借鉴 `布隆过滤器` 的思想,定义一个 `BitMap` 的超长数组 ,不用担心会占用特别多内存,Tom哥做过实验,1千万的数据占用内存 1.19M,计算公式: 10000000/8/1024/1024 = 1.19M
BitMap 适用场景非常广泛,如:
> 我们希望记录自己网站上用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。
>
比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。
>
举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。
>
当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。
>
地址:http://doc.redisfans.com/string/bitcount.html
详细内容,可以看下之前文章:[《什么是布隆过滤器?如何解决高并发缓存穿透问题?》](https://mp.weixin.qq.com/s/3OIbp837pTAvQaRlmzU7Nw)
方案一确实能满足工作要求,性能也不错,用的技术也是我们熟悉的。但是,勇于挑战的我们是不是应该有个更好的技术方案
## 方案二(流式计算)
现在大数据技术这么普及,我们可以使用`流式计算 `来统计数据,比如 Flink 框架
技术方案与上面的流程类似,区别在与任务采用了 Flink 框架,通过`消息事件` 驱动的方式来触发任务
### 1、环境配置
安装 Kafka、Redis 等中间件,之前的一篇文章有详细介绍
> https://articles.zsxq.com/id_t9j3o8i3hb6z.html
创建一个订单 Topic,名字为 `create_order_topic` ,用来存储下单消息
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic_1
### 2、创建一个订单消息实体
```java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
private Long userId; //用户id
private Long itemId; //商品id
private Double price;//订单金额
private Integer count;//购买数量
private Long time;
}
```
### 3、模拟生产端,往 Kafka 的 create_order_topic_1 主题投递消息
```java
static List orderMessageList = Lists.newArrayList();
/**
* 初始化数据
*/
static {
orderMessageList.add(new OrderMessage(1L, 170170001L, 100d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170002L, 200d, 2, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170003L, 300d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170004L, 400d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170005L, 500d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170006L, 600d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170007L, 700d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170008L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170009L, 100d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(3L, 170170010L, 100d, 6, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170011L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170012L, 100d, 1, System.currentTimeMillis()));
}
/**
* 模拟生产订单数据
*/
@Override
public void run(SourceContext ctx) throws Exception {
for (int i = 0; i < orderMessageList.size(); i++) {
ctx.collect(JSON.toJSONString(orderMessageList.get(i)));
//每 4 秒 产生一条数据
Thread.sleep(5000);
}
}
```
### 4、Flink 任务端接收 Kafka 订单消息,并聚合处理
首先,添加绑定 Kafka 数据源,并订阅下单消息 topic
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//采用 Event-Time 来作为 时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置 Checkpoint 时间周期为 60 秒
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
env.setParallelism(1);
// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();
DataStream stream = env.addSource(consumer);
```
对接收的数据反序列化,转化为 `OrderMessage` 对象
```java
// 数据反序列化
DataStream orderStream = stream.map(message -> {
//System.out.println(message);
return JSON.parseObject(message, OrderMessage.class);
});
```
中国时区, 统计区间每天的 0 点~ 24 点的数据进行计算
> TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
生产环境,一天只输出一次计算结果不太合理,我们可以通过 `trigger` 设置触发器,以一定的频率输出计算结果
> trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4)))
> 每隔 4 秒触发一次计算,并输出中间结果
通过 `KeySelector` 按天对数据分组,并且通过 evictor 剔除已经计算过的数据,避免大量的数据堆积在内存中,把内存撑爆
```java
SingleOutputStreamOperator> single = orderStream.keyBy(new KeySelector() {
@Override
public String getKey(OrderMessage value) throws Exception {
return timeStampToDate(value.getTime());
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new GMVProcessWindowFunctionBitMap());
```
**CountEvictor 剔除有三种:**
* 1. CountEvictor:数量剔除器。在 Window 中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
* 2. DeltaEvictor:阈值剔除器。计算 Window 中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
* 3. TimeEvictor:时间剔除器。保留 Window 中最近一段时间内的元素,并丢弃其余元素。
> TimeEvictor.of(Time.seconds(0), true)
> 剔除已经计算过的数据元素,释放内存
系统交易额汇总计算,下过订单的用户 id (Long 类型) 放在 `Roaring64NavigableMap` 去重,中间计算结果通过 `ValueState` 来上下传递
```java
public class GMVProcessWindowFunctionBitMap extends ProcessWindowFunction, String, TimeWindow> {
private transient ValueState gmvState;
private transient ValueState bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
gmvState = this.getRuntimeContext().getState(new ValueStateDescriptor("gmv", Double.class));
bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint() {
})));
}
@Override
public void process(String s, Context context, Iterable elements, Collector> out) throws Exception {
Double gmv = gmvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if (bitMap == null) {
bitMap = new Roaring64NavigableMap();
gmv = 0d;
}
Iterator iterator = elements.iterator();
while (iterator.hasNext()) {
OrderMessage orderMessage = iterator.next();
System.out.println("GMVProcessWindowFunctionBitMap = " + JSON.toJSONString(orderMessage));
gmv = gmv + orderMessage.getPrice();
Long userId = orderMessage.getUserId();
bitMap.add(userId);
gmvState.update(gmv);
bitMapState.update(bitMap);
}
out.collect(Tuple3.of(s, "userCount", String.valueOf(bitMap.getIntCardinality())));
out.collect(Tuple3.of(s, "gmv", String.valueOf(gmv)));
}
```
## 源码地址
> https://github.com/aalansehaiyang/xq_project