大厂高频:Flink分区分配策略

原标题:大厂高频:Flink分区分配策略

目前Flink包含8个重分区算子,对应8个分区器(7个官方定义及1个自定义),均继承与父类StreamPartitioner。以下带大家了解Flink的各种分区策略。

1. RebalancePartitioner

2. RescalePartitioner

3. KeyGroupStreamPartitioner

4. GlobalPartitioner

5. ShufflePartitioner

6. ForwardPartitioner

7. BroadcastPartitioner

8. CustomPartitionerWrapper(自定义分区器)

分区器策略详解

概览图

1. GlobalPartitioner

GlobalPartitioner只会将数据输出到下游算子的0分区。

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

return 0;

}

2. ShufflePartitioner

ShufflePartitioner会将数据随机输出到下游算子的并发实例。由于java.util.Random生成的随机数符合均匀分布,故能够近似保证平均。

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

return random.nextInt(numberOfChannels);

}

3. RebalancePartitioner

RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。

private int nextChannelToSendTo;

@Override

public void setup ( int numberOfChannels){

super.setup(numberOfChannels);

nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);

}

@Override

public int selectChannel (SerializationDelegate < StreamRecord < T >> record) {

nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;

return nextChannelToSendTo;

}

}

4. KeyGroupStreamPartitioner

这就是keyBy()算子底层所采用的StreamPartitioner,可见是先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID。

@Override

public int selectChannel (SerializationDelegate < StreamRecord < T >> record) {

K key;

try {

key = keySelector.getKey(record.getInstance().getValue());

} catch (Exception e) {

throw new RuntimeException(

“Could not extract key from ” + record.getInstance().getValue(),

e);

}

return KeyGroupRangeAssignment.assignKeyToParallelOperator(

key,

maxParallelism,

numberOfChannels);

}

public static int assignKeyToParallelOperator (Object key,

int maxParallelism, int parallelism){

return computeOperatorIndexForKeyGroup(

maxParallelism,

parallelism,

assignToKeyGroup(key, maxParallelism));

}

public static int assignToKeyGroup (Object key,int maxParallelism){

return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);

}

public static int computeKeyGroupForKeyHash ( int keyHash, int maxParallelism){

return MathUtils.murmurHash(keyHash) % maxParallelism;

}

public static int computeOperatorIndexForKeyGroup ( int maxParallelism, int parallelism,

int keyGroupId){

return keyGroupId * parallelism / maxParallelism;

}

5. BroadcastPartitioner

BroadcastPartitioner是广播流专用的分区器。由于广播流发挥作用必须靠DataStream.connect()方法与正常的数据流连接起来,所以实际上不需要BroadcastPartitioner来选择分区(广播数据总会投递给下游算子的所有并发),selectChannel()方法也就不必实现了。

@Overridepublic

int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

throw new UnsupportedOperationException(“Broadcast partitioner does not support select channels.”);

}

@Overridepublic

boolean isBroadcast() {

return true;

}

6. RescalePartitioner

基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。

若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

if (++nextChannelToSendTo >= numberOfChannels) {

nextChannelToSendTo = 0;

}

return nextChannelToSendTo;

}

7. ForwardPartitioner

发送到下游对应的第一个task,保证上下游算子并行度一致,即上有算子与下游算子是1:1的关系

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

return 0;

}

public StreamPartitioner<T> copy() {

return this; }

@Override

public String toString() {

return “FORWARD”;

}

总结

Flink分区策略是指对数据流进行分区的方法。它可以把无限的数据流按照一定的规则划分为多个分区,每个分区中的数据元素都有相同的key。Flink分区策略可以根据数据元素的key或其他属性来决定分区方式,并在每个分区中执行相同的操作,以满足特定的业务需求。返回搜狐,查看更多

责任编辑:

    THE END
    喜欢就支持一下吧
    点赞8 分享
    评论 抢沙发
    头像
    欢迎您留下宝贵的见解!
    提交
    头像

    昵称

    取消
    昵称表情代码图片

      暂无评论内容