作者:Tom哥
公众号:微观技术
博客:https://offercome.cn
人生理念:知道的越多,不知道的越多,努力去学
电商平台大家都不陌生,即使没做过也经常使用,相信各位小伙伴都有体感。
就像微博热搜一样,电商平台有很多热点商品,比如:很多网红主播正在直播带货,瞬间带来大流量,对系统产生很大冲击。
今天,我们就来讲下关于热点商品的技术架构方案,分为几个部分
首先我们先来张整理架构图:
技术方案用到了 Kafka、Redis 中间件,先来做前置准备工作,将这些软件安装好
从 apache
官网下载对应的版本,下载地址
本文实验用的版本是 :kafka_2.12-2.6.0,对压缩包解压
tar -zxvf kafka_2.12-2.6.0.tgz
cd kafka_2.12-2.6.0
启动 ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
启动 Kafka Server
nohup bin/kafka-server-start.sh config/server.properties &
创建一个订单 Topic,名字为 create_order_topic,用来存储下单消息
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic
终端执行上述命令,启动成功后,可以看到如下图:
从官网上下载一个稳定的 Redis 版本
本文测试的版本是 :redis-6.0.6,压缩包解压
tar xzf redis-6.0.6.tar.gz
cd redis-6.0.6
make
进入 src 目录,在本地启动一个 Redis 单机实例
src/redis-server
启动成功后,可以看到如下图:
也可以在终端新开个窗口,测试一些 Redis 命令
定义一个订单消息实体,这里我们做下简化,只保留一些核心字段,如:用户id 、商品id 、订单金额 、购买数量 等
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderDetail implements Serializable {
private Long userId; //用户id
private Long itemId; //商品id
private Double price;//订单金额
private Long count;//购买数量
private Long timeStamp;//下单时间
}
然后,定义消息的发送端。
通常的逻辑是将订单数据先存储到本地数据库,然后发送一个异步消息,保证两个操作的事务性。
这里为了简化,只是定义了一个消息发送器,采用无限循环,模拟生成订单消息。
然后,将消息存储到了 Kafka 中
/**
* 业务系统,模拟创建下单消息
*/
public class KafkaProducer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
DataStreamSource<String> orderStream = env.addSource(new MakeOrderSourceFunction()).setParallelism(1);
// Kafka 配置参数
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
"127.0.0.1:9092", //broker列表
"create_order_topic", //topic
new SimpleStringSchema()); // 消息序列化
//写入Kafka时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
orderStream.addSink(producer);
env.execute();
}
}
作为消费端,从 kafka 中拉取订单消息,得到字符串格式的数据流
// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();
将数据流反序列化成 OrderDetail
对象
// 数据反序列化
DataStream<OrderDetail> orderStream = stream.map(message -> {
//System.out.println(message);
return JSON.parseObject(message, OrderDetail.class);
});
按商品id 进行分组、聚合
// 按 商品id 进行聚合
KeyedStream<OrderDetail, Long> orderDetailStringKeyedStream = orderStream.keyBy(new KeySelector<OrderDetail, Long>() {
@Override
public Long getKey(OrderDetail deviceInfo) {
return deviceInfo.getItemId();
}
});
定义滑动窗口,总时长为 1 个小时,滑动窗口的最小分区为 5 秒,也就是每5秒计算一次
WindowedStream<OrderDetail, Long, TimeWindow> window =
orderDetailStringKeyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(3600), Time.seconds(5)));
接下来,需要按商品id分组后,对购买数量聚合求和
SingleOutputStreamOperator<OrderDetail> reduce = window.reduce(new ReduceFunction<OrderDetail>() {
@Override
public OrderDetail reduce(OrderDetail value1, OrderDetail value2) throws Exception {
OrderDetail orderDetail = new OrderDetail();
orderDetail.setItemId(value1.getItemId());
orderDetail.setCount(value1.getCount() + value2.getCount());
System.out.println("聚合结果 ===" + JSON.toJSONString(orderDetail));
return orderDetail;
}
});
业务系统通常是直接面向 C 端用户的,访问量一般很大
我们需要有一种中间介质,用来中转 Flink 实时计算出来的热点商品 id 数据
这里我们选择了 Redis
那么 Flink 如何将聚合后的 SingleOutputStreamOperator
数据流写入到 Redis ,代码如下:
// Redis 配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
reduce.addSink(new RedisSink<>(conf, new RedisMapper<OrderDetail>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
/**
* 设置Key
*/
@Override
public String getKeyFromData(OrderDetail data) {
return "sku_id=" + String.valueOf(data.getItemId());
}
/**
* 设置value
*/
@Override
public String getValueFromData(OrderDetail data) {
return String.valueOf(data.getCount());
}
}));
通过 Redis的管理工具(Redis Desktop Manager),查询缓存中 **sku_id = 100**
对应的数据