原标题:大厂高频:Flink分区分配策略
目前Flink包含8个重分区算子,对应8个分区器(7个官方定义及1个自定义),均继承与父类StreamPartitioner。以下带大家了解Flink的各种分区策略。
1. RebalancePartitioner
2. RescalePartitioner
3. KeyGroupStreamPartitioner
4. GlobalPartitioner
5. ShufflePartitioner
6. ForwardPartitioner
7. BroadcastPartitioner
8. CustomPartitionerWrapper(自定义分区器)
分区器策略详解
概览图
1. GlobalPartitioner
GlobalPartitioner只会将数据输出到下游算子的0分区。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
2. ShufflePartitioner
ShufflePartitioner会将数据随机输出到下游算子的并发实例。由于java.util.Random生成的随机数符合均匀分布,故能够近似保证平均。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
3. RebalancePartitioner
RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。
private int nextChannelToSendTo;
@Override
public void setup ( int numberOfChannels){
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
@Override
public int selectChannel (SerializationDelegate < StreamRecord < T >> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
}
4. KeyGroupStreamPartitioner
这就是keyBy()算子底层所采用的StreamPartitioner,可见是先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID。
@Override
public int selectChannel (SerializationDelegate < StreamRecord < T >> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException(
“Could not extract key from ” + record.getInstance().getValue(),
e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key,
maxParallelism,
numberOfChannels);
}
public static int assignKeyToParallelOperator (Object key,
int maxParallelism, int parallelism){
return computeOperatorIndexForKeyGroup(
maxParallelism,
parallelism,
assignToKeyGroup(key, maxParallelism));
}
public static int assignToKeyGroup (Object key,int maxParallelism){
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash ( int keyHash, int maxParallelism){
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
public static int computeOperatorIndexForKeyGroup ( int maxParallelism, int parallelism,
int keyGroupId){
return keyGroupId * parallelism / maxParallelism;
}
5. BroadcastPartitioner
BroadcastPartitioner是广播流专用的分区器。由于广播流发挥作用必须靠DataStream.connect()方法与正常的数据流连接起来,所以实际上不需要BroadcastPartitioner来选择分区(广播数据总会投递给下游算子的所有并发),selectChannel()方法也就不必实现了。
@Overridepublic
int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException(“Broadcast partitioner does not support select channels.”);
}
@Overridepublic
boolean isBroadcast() {
return true;
}
6. RescalePartitioner
基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
7. ForwardPartitioner
发送到下游对应的第一个task,保证上下游算子并行度一致,即上有算子与下游算子是1:1的关系
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
public StreamPartitioner<T> copy() {
return this; }
@Override
public String toString() {
return “FORWARD”;
}
总结
Flink分区策略是指对数据流进行分区的方法。它可以把无限的数据流按照一定的规则划分为多个分区,每个分区中的数据元素都有相同的key。Flink分区策略可以根据数据元素的key或其他属性来决定分区方式,并在每个分区中执行相同的操作,以满足特定的业务需求。返回搜狐,查看更多
责任编辑:
暂无评论内容