1
0

电商平台的热点商品架构方案.md 7.3 KB


title: 电商平台的热点商品架构方案

电商平台的热点商品架构方案

作者:Tom哥
公众号:微观技术
博客:https://offercome.cn
人生理念:知道的越多,不知道的越多,努力去学

电商平台大家都不陌生,即使没做过也经常使用,相信各位小伙伴都有体感。

就像微博热搜一样,电商平台有很多热点商品,比如:很多网红主播正在直播带货,瞬间带来大流量,对系统产生很大冲击。

今天,我们就来讲下关于热点商品的技术架构方案,分为几个部分

  1. 收集交易系统的订单数据
  2. 计算出热点商品
  3. 将计算结果推送给业务系统

一、整体架构

首先我们先来张整理架构图:

二、配置环境

技术方案用到了 Kafka、Redis 中间件,先来做前置准备工作,将这些软件安装好

1、安装 Kafka

apache 官网下载对应的版本,下载地址

https://kafka.apache.org/downloads

本文实验用的版本是 :kafka_2.12-2.6.0,对压缩包解压

tar -zxvf kafka_2.12-2.6.0.tgz
cd kafka_2.12-2.6.0

启动 ZooKeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动 Kafka Server

nohup bin/kafka-server-start.sh config/server.properties &

创建一个订单 Topic,名字为 create_order_topic,用来存储下单消息

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic

终端执行上述命令,启动成功后,可以看到如下图:

2、安装 Redis

从官网上下载一个稳定的 Redis 版本

http://download.redis.io/releases/

本文测试的版本是 :redis-6.0.6,压缩包解压

tar xzf redis-6.0.6.tar.gz
cd redis-6.0.6
make

进入 src 目录,在本地启动一个 Redis 单机实例

src/redis-server

启动成功后,可以看到如下图:

也可以在终端新开个窗口,测试一些 Redis 命令

三、技术方案(含代码)

1、收集交易系统的订单数据

定义一个订单消息实体,这里我们做下简化,只保留一些核心字段,如:用户id 、商品id 、订单金额 、购买数量 等

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderDetail implements Serializable {

    private Long userId; //用户id
    private Long itemId; //商品id
    private Double price;//订单金额
    private Long count;//购买数量
    private Long timeStamp;//下单时间

}

然后,定义消息的发送端。

通常的逻辑是将订单数据先存储到本地数据库,然后发送一个异步消息,保证两个操作的事务性。

这里为了简化,只是定义了一个消息发送器,采用无限循环,模拟生成订单消息。

然后,将消息存储到了 Kafka 中

/**
 * 业务系统,模拟创建下单消息
 */
public class KafkaProducer {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);

        DataStreamSource<String> orderStream = env.addSource(new MakeOrderSourceFunction()).setParallelism(1);

        // Kafka 配置参数
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                "127.0.0.1:9092",   //broker列表
                "create_order_topic", //topic
                new SimpleStringSchema());    // 消息序列化

        //写入Kafka时附加记录的事件时间戳
        producer.setWriteTimestampToKafka(true);

        orderStream.addSink(producer);
        env.execute();
    }

}

2、计算出热点商品

作为消费端,从 kafka 中拉取订单消息,得到字符串格式的数据流

// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();

将数据流反序列化成 OrderDetail 对象

// 数据反序列化
DataStream<OrderDetail> orderStream = stream.map(message -> {
    //System.out.println(message);
    return JSON.parseObject(message, OrderDetail.class);
});

按商品id 进行分组、聚合

// 按 商品id 进行聚合
KeyedStream<OrderDetail, Long> orderDetailStringKeyedStream = orderStream.keyBy(new KeySelector<OrderDetail, Long>() {
    @Override
    public Long getKey(OrderDetail deviceInfo) {
        return deviceInfo.getItemId();
    }
});

定义滑动窗口,总时长为 1 个小时,滑动窗口的最小分区为 5 秒,也就是每5秒计算一次

WindowedStream<OrderDetail, Long, TimeWindow> window = 
orderDetailStringKeyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(3600), Time.seconds(5)));

接下来,需要按商品id分组后,对购买数量聚合求和

SingleOutputStreamOperator<OrderDetail> reduce = window.reduce(new ReduceFunction<OrderDetail>() {
    @Override
    public OrderDetail reduce(OrderDetail value1, OrderDetail value2) throws Exception {

        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setItemId(value1.getItemId());
        orderDetail.setCount(value1.getCount() + value2.getCount());
        System.out.println("聚合结果 ===" + JSON.toJSONString(orderDetail));
        return orderDetail;
    }
});

3、将计算结果推送给业务系统

业务系统通常是直接面向 C 端用户的,访问量一般很大

我们需要有一种中间介质,用来中转 Flink 实时计算出来的热点商品 id 数据

这里我们选择了 Redis

那么 Flink 如何将聚合后的 SingleOutputStreamOperator 数据流写入到 Redis ,代码如下:

// Redis 配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
reduce.addSink(new RedisSink<>(conf, new RedisMapper<OrderDetail>() {

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SET);
    }

    /**
     * 设置Key
     */
    @Override
    public String getKeyFromData(OrderDetail data) {
        return "sku_id=" + String.valueOf(data.getItemId());
    }

    /**
     * 设置value
     */
    @Override
    public String getValueFromData(OrderDetail data) {
        return String.valueOf(data.getCount());
    }
}));

4、运行效果

通过 Redis的管理工具(Redis Desktop Manager),查询缓存中 **sku_id = 100** 对应的数据

源码地址

https://github.com/aalansehaiyang/xq_project