作者:Tom哥
公众号:微观技术
博客:https://offercome.cn
人生理念:知道的越多,不知道的越多,努力去学
小T 同学前段时间入职一家电商公司,正赶上公司在搞一场大型促销活动,整个活动周期是 6月18日一整天,老板想看下网站的实时 GMV 和 客单价。
听说小T搞过大数据,于是把这个工作安排给了小 T。
如果你是小T, 该如何完成这项工作呢?
这是阿里每年双十一 的实时交易大屏,给人一种收获的感觉,也是向老板展示这一年成果的时刻。
离线计算能满足对大量数据 进行复杂的批量计算 ,但是要求数据在计算之前已经固定,不再发生变化。而且时间周期比较长,一般都是T+1模式
将 MySQL 数据库中前一天订单数据提取到离线仓库,然后通过 Hive SQL 执行一条跑批任务语句,即可计算出结果
实现起来确实很简单,但不满足老板的要求
既然要求实时
,那就不能采用离线数仓
玩法,我们需要换个思路
可以采用常规业务架构 思路,通过 Kafka 完成系统解耦,通过开发消费任务 计算网站的交易额和客单价。
整个流程如下图所示,作为一个有多年 CRUD 开发经验 Boy,甚至都不用做技术方案,直接撸代码
统计任务可以用我们熟悉的 Java 代码来开发,其实就是一个普通的 MQ 异步消费任务。唯一有点挑战难度的就是统计去重后的下单客户总数
有的同学一拍脑袋,那还不简单,Set 集合
就可以呀
互联网的用户体量一般很大,如果上亿的数据量,Set 集合 并不友好
我们可以借鉴 布隆过滤器
的思想,定义一个 BitMap
的超长数组 ,不用担心会占用特别多内存,Tom哥做过实验,1千万的数据占用内存 1.19M,计算公式: 10000000/8/1024/1024 = 1.19M
BitMap 适用场景非常广泛,如:
我们希望记录自己网站上用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。
比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。
举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。
当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。
地址:http://doc.redisfans.com/string/bitcount.html
详细内容,可以看下之前文章:《什么是布隆过滤器?如何解决高并发缓存穿透问题?》
方案一确实能满足工作要求,性能也不错,用的技术也是我们熟悉的。但是,勇于挑战的我们是不是应该有个更好的技术方案
现在大数据技术这么普及,我们可以使用流式计算
来统计数据,比如 Flink 框架
技术方案与上面的流程类似,区别在与任务采用了 Flink 框架,通过消息事件
驱动的方式来触发任务
安装 Kafka、Redis 等中间件,之前的一篇文章有详细介绍
创建一个订单 Topic,名字为 create_order_topic
,用来存储下单消息
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic_1
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
private Long userId; //用户id
private Long itemId; //商品id
private Double price;//订单金额
private Integer count;//购买数量
private Long time;
}
static List<OrderMessage> orderMessageList = Lists.newArrayList();
/**
* 初始化数据
*/
static {
orderMessageList.add(new OrderMessage(1L, 170170001L, 100d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170002L, 200d, 2, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170003L, 300d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170004L, 400d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170005L, 500d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170006L, 600d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170007L, 700d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170008L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170009L, 100d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(3L, 170170010L, 100d, 6, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170011L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170012L, 100d, 1, System.currentTimeMillis()));
}
/**
* 模拟生产订单数据
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < orderMessageList.size(); i++) {
ctx.collect(JSON.toJSONString(orderMessageList.get(i)));
//每 4 秒 产生一条数据
Thread.sleep(5000);
}
}
首先,添加绑定 Kafka 数据源,并订阅下单消息 topic
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//采用 Event-Time 来作为 时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置 Checkpoint 时间周期为 60 秒
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
env.setParallelism(1);
// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
对接收的数据反序列化,转化为 OrderMessage
对象
// 数据反序列化
DataStream<OrderMessage> orderStream = stream.map(message -> {
//System.out.println(message);
return JSON.parseObject(message, OrderMessage.class);
});
中国时区, 统计区间每天的 0 点~ 24 点的数据进行计算
TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
生产环境,一天只输出一次计算结果不太合理,我们可以通过 trigger
设置触发器,以一定的频率输出计算结果
trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4))) 每隔 4 秒触发一次计算,并输出中间结果
通过 KeySelector
按天对数据分组,并且通过 evictor 剔除已经计算过的数据,避免大量的数据堆积在内存中,把内存撑爆
SingleOutputStreamOperator<Tuple3<String, String, String>> single = orderStream.keyBy(new KeySelector<OrderMessage, String>() {
@Override
public String getKey(OrderMessage value) throws Exception {
return timeStampToDate(value.getTime());
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new GMVProcessWindowFunctionBitMap());
CountEvictor 剔除有三种:
TimeEvictor.of(Time.seconds(0), true) 剔除已经计算过的数据元素,释放内存
系统交易额汇总计算,下过订单的用户 id (Long 类型) 放在 Roaring64NavigableMap
去重,中间计算结果通过 ValueState
来上下传递
public class GMVProcessWindowFunctionBitMap extends ProcessWindowFunction<OrderMessage, Tuple3<String, String, String>, String, TimeWindow> {
private transient ValueState<Double> gmvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
gmvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Double>("gmv", Double.class));
bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {
})));
}
@Override
public void process(String s, Context context, Iterable<OrderMessage> elements, Collector<Tuple3<String, String, String>> out) throws Exception {
Double gmv = gmvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if (bitMap == null) {
bitMap = new Roaring64NavigableMap();
gmv = 0d;
}
Iterator<OrderMessage> iterator = elements.iterator();
while (iterator.hasNext()) {
OrderMessage orderMessage = iterator.next();
System.out.println("GMVProcessWindowFunctionBitMap = " + JSON.toJSONString(orderMessage));
gmv = gmv + orderMessage.getPrice();
Long userId = orderMessage.getUserId();
bitMap.add(userId);
gmvState.update(gmv);
bitMapState.update(bitMap);
}
out.collect(Tuple3.of(s, "userCount", String.valueOf(bitMap.getIntCardinality())));
out.collect(Tuple3.of(s, "gmv", String.valueOf(gmv)));
}