--- title: 电商大促,「网站实时成交额」仪表大盘技术方案? --- # 电商大促,「网站实时成交额」仪表大盘技术方案? > 作者:Tom哥 >
公众号:微观技术 >
博客:[https://offercome.cn](https://offercome.cn) >
人生理念:知道的越多,不知道的越多,努力去学 小T 同学前段时间入职一家电商公司,正赶上公司在搞一场大型促销活动,整个活动周期是 6月18日一整天,老板想看下网站的实时 GMV 和 客单价。 听说小T搞过大数据,于是把这个工作安排给了小 T。 如果你是小T, 该如何完成这项工作呢?
这是阿里每年双十一 的实时交易大屏,给人一种收获的感觉,也是向老板展示这一年成果的时刻。 离线计算能满足对大量数据 进行复杂的批量计算 ,但是要求数据在计算之前已经固定,不再发生变化。而且时间周期比较长,一般都是T+1模式 > 将 MySQL 数据库中前一天订单数据提取到离线仓库,然后通过 Hive SQL 执行一条跑批任务语句,即可计算出结果 实现起来确实很简单,但不满足老板的要求
既然要求`实时` ,那就不能采用`离线数仓` 玩法,我们需要换个思路 ## 方案一(业务架构型) 可以采用常规业务架构 思路,通过 Kafka 完成系统解耦,通过开发消费任务 计算网站的交易额和客单价。 整个流程如下图所示,作为一个有多年 **CRUD** 开发经验 Boy,甚至都不用做技术方案,直接撸代码
统计任务可以用我们熟悉的 Java 代码来开发,其实就是一个普通的 MQ 异步消费任务。唯一有点挑战难度的就是统计去重后的下单客户总数 有的同学一拍脑袋,那还不简单,`Set 集合` 就可以呀 互联网的用户体量一般很大,如果上亿的数据量,Set 集合 并不友好 我们可以借鉴 `布隆过滤器` 的思想,定义一个 `BitMap` 的超长数组 ,不用担心会占用特别多内存,Tom哥做过实验,1千万的数据占用内存 1.19M,计算公式: 10000000/8/1024/1024 = 1.19M BitMap 适用场景非常广泛,如: > 我们希望记录自己网站上用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。 >
比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。 >
举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。 >
当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。 >
地址:http://doc.redisfans.com/string/bitcount.html 详细内容,可以看下之前文章:[《什么是布隆过滤器?如何解决高并发缓存穿透问题?》](https://mp.weixin.qq.com/s/3OIbp837pTAvQaRlmzU7Nw) 方案一确实能满足工作要求,性能也不错,用的技术也是我们熟悉的。但是,勇于挑战的我们是不是应该有个更好的技术方案
## 方案二(流式计算) 现在大数据技术这么普及,我们可以使用`流式计算 `来统计数据,比如 Flink 框架 技术方案与上面的流程类似,区别在与任务采用了 Flink 框架,通过`消息事件` 驱动的方式来触发任务
### 1、环境配置 安装 Kafka、Redis 等中间件,之前的一篇文章有详细介绍 > https://articles.zsxq.com/id_t9j3o8i3hb6z.html 创建一个订单 Topic,名字为 `create_order_topic` ,用来存储下单消息 > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic_1 ### 2、创建一个订单消息实体 ```java @Data @AllArgsConstructor @NoArgsConstructor public class OrderMessage { private Long userId; //用户id private Long itemId; //商品id private Double price;//订单金额 private Integer count;//购买数量 private Long time; } ``` ### 3、模拟生产端,往 Kafka 的 create_order_topic_1 主题投递消息 ```java static List orderMessageList = Lists.newArrayList(); /** * 初始化数据 */ static { orderMessageList.add(new OrderMessage(1L, 170170001L, 100d, 1, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170002L, 200d, 2, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170003L, 300d, 3, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170004L, 400d, 1, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170005L, 500d, 5, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170006L, 600d, 1, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170007L, 700d, 3, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(1L, 170170008L, 100d, 9, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(2L, 170170009L, 100d, 5, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(3L, 170170010L, 100d, 6, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(2L, 170170011L, 100d, 9, System.currentTimeMillis())); orderMessageList.add(new OrderMessage(2L, 170170012L, 100d, 1, System.currentTimeMillis())); } /** * 模拟生产订单数据 */ @Override public void run(SourceContext ctx) throws Exception { for (int i = 0; i < orderMessageList.size(); i++) { ctx.collect(JSON.toJSONString(orderMessageList.get(i))); //每 4 秒 产生一条数据 Thread.sleep(5000); } } ``` ### 4、Flink 任务端接收 Kafka 订单消息,并聚合处理 首先,添加绑定 Kafka 数据源,并订阅下单消息 topic ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //采用 Event-Time 来作为 时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置 Checkpoint 时间周期为 60 秒 env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000); env.setParallelism(1); // 配置 kafka 参数 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties); // 从最早开始消费 consumer.setStartFromLatest(); DataStream stream = env.addSource(consumer); ``` 对接收的数据反序列化,转化为 `OrderMessage` 对象 ```java // 数据反序列化 DataStream orderStream = stream.map(message -> { //System.out.println(message); return JSON.parseObject(message, OrderMessage.class); }); ``` 中国时区, 统计区间每天的 0 点~ 24 点的数据进行计算 > TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
生产环境,一天只输出一次计算结果不太合理,我们可以通过 `trigger` 设置触发器,以一定的频率输出计算结果 > trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4))) > 每隔 4 秒触发一次计算,并输出中间结果 通过 `KeySelector` 按天对数据分组,并且通过 evictor 剔除已经计算过的数据,避免大量的数据堆积在内存中,把内存撑爆 ```java SingleOutputStreamOperator> single = orderStream.keyBy(new KeySelector() { @Override public String getKey(OrderMessage value) throws Exception { return timeStampToDate(value.getTime()); } }) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new GMVProcessWindowFunctionBitMap()); ``` **CountEvictor 剔除有三种:** * 1. CountEvictor:数量剔除器。在 Window 中保留指定数量的元素,并从窗口头部开始丢弃其余元素。 * 2. DeltaEvictor:阈值剔除器。计算 Window 中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。 * 3. TimeEvictor:时间剔除器。保留 Window 中最近一段时间内的元素,并丢弃其余元素。 > TimeEvictor.of(Time.seconds(0), true) > 剔除已经计算过的数据元素,释放内存 系统交易额汇总计算,下过订单的用户 id (Long 类型) 放在 `Roaring64NavigableMap` 去重,中间计算结果通过 `ValueState` 来上下传递 ```java public class GMVProcessWindowFunctionBitMap extends ProcessWindowFunction, String, TimeWindow> { private transient ValueState gmvState; private transient ValueState bitMapState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); gmvState = this.getRuntimeContext().getState(new ValueStateDescriptor("gmv", Double.class)); bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint() { }))); } @Override public void process(String s, Context context, Iterable elements, Collector> out) throws Exception { Double gmv = gmvState.value(); Roaring64NavigableMap bitMap = bitMapState.value(); if (bitMap == null) { bitMap = new Roaring64NavigableMap(); gmv = 0d; } Iterator iterator = elements.iterator(); while (iterator.hasNext()) { OrderMessage orderMessage = iterator.next(); System.out.println("GMVProcessWindowFunctionBitMap = " + JSON.toJSONString(orderMessage)); gmv = gmv + orderMessage.getPrice(); Long userId = orderMessage.getUserId(); bitMap.add(userId); gmvState.update(gmv); bitMapState.update(bitMap); } out.collect(Tuple3.of(s, "userCount", String.valueOf(bitMap.getIntCardinality()))); out.collect(Tuple3.of(s, "gmv", String.valueOf(gmv))); } ``` ## 源码地址 > https://github.com/aalansehaiyang/xq_project