Flink实战小练习

写这篇文章,记录我2019年时做Flink的编程练习,记录一些关键思想套路,为以后举一反三,触类旁通之用。以下需求都非常基础和简单,关键在思路,不在意代码细节。

项目场景

普通的互联网电商平台,一般有如下的数据统计需求

热门统计

通过观察用户的点击浏览行为,对用户进行流量统计,近期热门商品统计等。

偏好统计

通过观察用户的偏好,比如用户对某个商品做了收藏,点赞,打分等显式的情感偏好表达,我们进行用户画像分析,提供商品推荐列表供用户选择

3. 风险控制

对用户的非业务需求,即常规的普通使用需求,比如登录,下单支付,我们要提供后台安全保障和风险控制措施。并且对异常情况要及时预警告知,比如异地登录,支付密码多次错误等存在风险的情况。

综上需求简单归纳分类到工程模块,我们可以分成四个类别工程模块组件

实时热门商品统计实时流量统计恶意登录监控订单支付失效监控

因为偏好统计涉及到特征工程和机器学习的算法训练,这篇文章暂时不讨论这类需求。

数据源

用户行为数据表userIditemidcategoryIdbehaviortimestamp00000100000101buy

userid是用户ID,itemid是商品ID,categoryId是商品类别ID,behavior是行为类型比如(购买buy,浏览pv等),timestamp就是时间戳。

2. 日志表

ipuserIdeventTimemethodurl192.168.1.107000001GET/project/get

ip指IP地址,userId是用户ID,eventTime是事件事件,method是访问方法,url即访问地址。

需求一

问题: 每隔5分钟输出最近一小时内点击量最多的前N个商品,热门度点击量用浏览次数(“pv”)来衡量

思路图

第一:按照商品ID(itemid)进行分流

第二:将上图分流后的数据进行按时间窗口划分

第三:每个窗口聚合汇总该窗口内的商品ID的点击量,需要了解的是同一份数据会被分发到不同的窗口。此时将是一个商品ID对应一个时间窗口

第四:统计每个时间窗口里面的所有商品ID(itemid),此时,一个窗口内部汇聚多个商品ID,然后进行降序排序。

上图中将同颜色的窗口汇聚在一块,然后降序排序点击量,输出每个窗口的商品ID。

解题:

在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计,构建滑动窗口,窗口长度为1小时,滑动距离为5分钟,做滑动窗口聚合。举例而言,分别统计[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品点击量。(Sliding Window),注意,要以itemid作为窗口划分的键,从而分组统计,关键思想是,先按照商品IDitemid进行分流,再利用窗口函数和聚合函数计算出每一个itemid分别在不同时间窗口中的点击量,此时商品ID(itemid)对应一个窗口,然后在不同窗口时间范围内按照某个唯一键值进行汇总统计所有商品ID,汇聚每个时间窗口的所有商品itemID的点击量,再降序排序,就可以得到每个窗口点击量的TOPN的商品itemid。

代码如下

.keyBy(“itemId”) .timeWindow(Time.minutes(60), Time.minutes(5)) .aggregate(new CountAgg(), new WindowResultFunction());

备注:

1. CountAgg() 继承自AggregateFunction,提前聚合掉一些数据,减少state的存储压力,这和Spark里面aggregate的聚合算子原理一样。

2. WindowResultFunction 继承WindowFunction,用于定义输出数据格式。

2. 按自定义的键值汇总每个窗口里面的所有商品及其点击量,输出每个窗口中点击量前N名的商品

代码如下

.keyBy(“windowEnd”) .process(new TopNHotItems(3)); // 求点击量前3名的商品

process方法的ProcessFunction入参提供定时器timer的功能,我们将利用timer来判断何时收齐了某个window下所有商品的点击量数据。每当收到一条数据,我们就注册一个windowEnd+1的定时器。windowEnd+1的定时器被触发时,意味着收到了windowEnd+1的Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在onTimer()中处理将收集的所有商品及点击量进行排序,选出TopN。

备注: TopNHotItems继承自KeyedProcessFunction,重写onTimer方法,获取收到的所有商品点击量,然后在窗口内按照点击量降序排序。

代码注意

显式告诉Flink使用EventTime,默认是ProcessingTime,如下
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2. 生成Watermark,Watermark是用来追踪业务事件,指示当前处理到什么时刻的数据了。假设我们的数据源不存在乱序,事件的时间戳都是单调递增的,每条数据的业务时间就是Watermark。如下

.assignAscendingTimestamps(_.timestamp * 1000)

真实场景是乱序的,使用BoundedOutOfOrdernessTimestampExtractor。

需求二:页面浏览数统计,即统计在一段时间内用户访问某个url的次数,输出某段时间内访问量最多的前N个URL。

如每隔5秒,输出最近10分钟内访问量最多的前N个URL。

解题思路与需求一相同,不赘述。

需求三:恶意登录监控

用户在短时间内频繁登录失败,有程序恶意攻击的可能,同一用户(可以是不同IP)在2秒内连续两次登录失败,需要报警。

第一种想法,可以利用ListState,设定定时器N秒后触发,查看ListState中有多少次失败的登录。这种想法实现相对简单,把每次登录失败的数据存起来、设置定时器一段时间后再读取,这种做法尽管简单,但这种做法只能隔2秒之后去判断一下这期间是否有多次失败登录,而不是在一次登录失败之后、再一次登录失败时就立刻报警。所以不够完善。

第二种想法使用CEP(复杂事件处理)库实现事件流的模式匹配,用于在流中筛选符合某种复杂模式的事件。

// 定义匹配模式 val loginFailPattern = Pattern.begin[LoginEvent](“begin”) .where(_.eventType == “fail”) .next(“next”) .where(_.eventType == “fail”) .within(Time.seconds(2)) // 在数据流中匹配出定义好的模式 val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern) // .select方法传入一个 pattern select function,当检测到定义好的模式序列时就会调用 val loginFailDataStream = patternStream .select((pattern: Map[String, Iterable[LoginEvent]]) => { val first = pattern.getOrElse(“begin”, null).iterator.next() val second = pattern.getOrElse(“next”, null).iterator.next() (second.userId, second.ip, second.eventType) }) // 将匹配到的符合条件的事件打印出来 loginFailDataStream.print()

需求四:订单支付实时监控

最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低,并且为了降低安全风险,电商网站往往会对订单状态进行监控。设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。

我们依然使用CEP库来实现这个功能。

模式设置如下

val orderPayPattern = Pattern.begin[OrderEvent](“begin”) .where(_.eventType == “create”) .next(“next”) .where(_.eventType == “pay”) .within(Time.seconds(5))

以上表达式是说明,在15分钟内,事件“create”和“pay”是相互上下游依赖的。

随后, 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式

val patternStream = CEP.pattern(orderEventStream.keyBy(“orderId”), orderPayPattern)

随后

val complexResult = patternStream.select(orderTimeoutOutput) { // 对于已超时的部分模式匹配的事件序列,会调用这个函数 (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => { val createOrder = pattern.get(“begin”) OrderResult(createOrder.get.iterator.next().orderId, “timeout”) } } { // 检测到定义好的模式序列时,就会调用这个函数 pattern: Map[String, Iterable[OrderEvent]] => { val payOrder = pattern.get(“next”) OrderResult(payOrder.get.iterator.next().orderId, “success”) } }

    THE END
    喜欢就支持一下吧
    点赞9 分享
    评论 抢沙发
    头像
    欢迎您留下宝贵的见解!
    提交
    头像

    昵称

    取消
    昵称表情代码图片

      暂无评论内容