--- title: 电商平台的热点商品架构方案 --- # 电商平台的热点商品架构方案 > 作者:Tom哥 >
公众号:微观技术 >
博客:[https://offercome.cn](https://offercome.cn) >
人生理念:知道的越多,不知道的越多,努力去学 电商平台大家都不陌生,即使没做过也经常使用,相信各位小伙伴都有体感。 就像微博热搜一样,电商平台有很多热点商品,比如:很多网红主播正在直播带货,瞬间带来大流量,对系统产生很大冲击。 今天,我们就来讲下关于热点商品的技术架构方案,分为几个部分 1. 收集交易系统的订单数据 2. 计算出热点商品 3. 将计算结果推送给业务系统 ## 一、整体架构 首先我们先来张整理架构图:
## 二、配置环境 技术方案用到了 Kafka、Redis 中间件,先来做前置准备工作,将这些软件安装好 ### 1、安装 Kafka 从 `apache` 官网下载对应的版本,下载地址 > https://kafka.apache.org/downloads 本文实验用的版本是 :kafka_2.12-2.6.0,对压缩包解压 > tar -zxvf kafka_2.12-2.6.0.tgz >
cd kafka_2.12-2.6.0 启动 ZooKeeper > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 启动 Kafka Server > nohup bin/kafka-server-start.sh config/server.properties & 创建一个订单 Topic,名字为 create_order_topic,用来存储下单消息 > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic 终端执行上述命令,启动成功后,可以看到如下图:
### 2、安装 Redis 从官网上下载一个稳定的 Redis 版本 > http://download.redis.io/releases/ 本文测试的版本是 :redis-6.0.6,压缩包解压 > tar xzf redis-6.0.6.tar.gz >
cd redis-6.0.6 >
make 进入 src 目录,在本地启动一个 Redis 单机实例 > src/redis-server 启动成功后,可以看到如下图:
也可以在终端新开个窗口,测试一些 Redis 命令
## 三、技术方案(含代码) ### 1、收集交易系统的订单数据 定义一个订单消息实体,这里我们做下简化,只保留一些核心字段,如:用户id 、商品id 、订单金额 、购买数量 等 ```java @Data @AllArgsConstructor @NoArgsConstructor public class OrderDetail implements Serializable { private Long userId; //用户id private Long itemId; //商品id private Double price;//订单金额 private Long count;//购买数量 private Long timeStamp;//下单时间 } ``` 然后,定义消息的发送端。 通常的逻辑是将订单数据先存储到本地数据库,然后发送一个异步消息,保证两个操作的事务性。 这里为了简化,只是定义了一个消息发送器,采用无限循环,模拟生成订单消息。 然后,将消息存储到了 Kafka 中 ```java /** * 业务系统,模拟创建下单消息 */ public class KafkaProducer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(5000); DataStreamSource orderStream = env.addSource(new MakeOrderSourceFunction()).setParallelism(1); // Kafka 配置参数 FlinkKafkaProducer producer = new FlinkKafkaProducer( "127.0.0.1:9092", //broker列表 "create_order_topic", //topic new SimpleStringSchema()); // 消息序列化 //写入Kafka时附加记录的事件时间戳 producer.setWriteTimestampToKafka(true); orderStream.addSink(producer); env.execute(); } } ``` ### 2、计算出热点商品 作为消费端,从 kafka 中拉取订单消息,得到字符串格式的数据流 ``` // 配置 kafka 参数 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties); // 从最早开始消费 consumer.setStartFromLatest(); ``` 将数据流反序列化成 `OrderDetail` 对象 ``` // 数据反序列化 DataStream orderStream = stream.map(message -> { //System.out.println(message); return JSON.parseObject(message, OrderDetail.class); }); ``` 按商品id 进行分组、聚合 ``` // 按 商品id 进行聚合 KeyedStream orderDetailStringKeyedStream = orderStream.keyBy(new KeySelector() { @Override public Long getKey(OrderDetail deviceInfo) { return deviceInfo.getItemId(); } }); ``` 定义滑动窗口,总时长为 1 个小时,滑动窗口的最小分区为 5 秒,也就是每5秒计算一次 ``` WindowedStream window = orderDetailStringKeyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(3600), Time.seconds(5))); ``` 接下来,需要按商品id分组后,对购买数量聚合求和 ```java SingleOutputStreamOperator reduce = window.reduce(new ReduceFunction() { @Override public OrderDetail reduce(OrderDetail value1, OrderDetail value2) throws Exception { OrderDetail orderDetail = new OrderDetail(); orderDetail.setItemId(value1.getItemId()); orderDetail.setCount(value1.getCount() + value2.getCount()); System.out.println("聚合结果 ===" + JSON.toJSONString(orderDetail)); return orderDetail; } }); ``` ### 3、将计算结果推送给业务系统 业务系统通常是直接面向 C 端用户的,访问量一般很大 我们需要有一种中间介质,用来中转 Flink 实时计算出来的热点商品 id 数据 这里我们选择了 **Redis** 那么 Flink 如何将聚合后的 `SingleOutputStreamOperator` 数据流写入到 Redis ,代码如下: ```java // Redis 配置 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build(); reduce.addSink(new RedisSink<>(conf, new RedisMapper() { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET); } /** * 设置Key */ @Override public String getKeyFromData(OrderDetail data) { return "sku_id=" + String.valueOf(data.getItemId()); } /** * 设置value */ @Override public String getValueFromData(OrderDetail data) { return String.valueOf(data.getCount()); } })); ``` ### 4、运行效果 通过 Redis的管理工具(Redis Desktop Manager),查询缓存中 `**sku_id = 100**` 对应的数据
## 源码地址 > https://github.com/aalansehaiyang/xq_project