Flink源码分析之Flink是如何kafka读取数据的
首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:
//入口方法 start a source
public void run(SourceContext sourceContext) throws Exception {
......
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
//创建Fetcher 从kafka中拉取数据
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
//未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
kafkaFetcher.runFetchLoop();
} else {
//仍然调用了kafkaFetcher.runFetchLoop();
runWithPartitionDiscovery();
}
}
createFetcher方法
@Override
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
Map assignedPartitionsWithInitialOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
......
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象
public KafkaFetcher(
SourceFunction.SourceContext sourceContext,
Map assignedPartitionsWithInitialOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KafkaDeserializationSchema deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
......
this.consumerThread = new KafkaConsumerThread(
LOG,
//KafkaConsumerThread 构造器中的参数
handover,
kafkaProperties,
//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
unassignedPartitionsQueue,
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
}
至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message
//fetcher message from kafka
public void runFetchLoop() throws Exception {
try {
//KafkaConsumerThread构造的参数之一
final Handover handover = this.handover;
// kick off the actual Kafka consumer
//实际的从kafka中拉取数据的地方
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
//从handover中获取数据,然后对records进行处理
final ConsumerRecords records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState partition : subscribedPartitionStates()) {
List> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord record : partitionRecords) {
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
//发送消息,接下来就是timestamps和watermark的处理了
emitRecord(value, partition, record.offset(), record);
}
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}
既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法
@Override
public void run() {
// early exit check
if (!running) {
return;
}
// this is the means to talk to FlinkKafkaConsumer's main thread
//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
final Handover handover = this.handover;
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
//获取consumer
this.consumer = getConsumer(kafkaProperties);
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// from here on, the consumer is guaranteed to be closed properly
......
// early exit check
if (!running) {
return;
}
// the latest bulk of records. May carry across the loop if the thread is woken up
// from blocking on the handover
ConsumerRecords records = null;
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List> newPartitions;
// main fetch loop
while (running) {
// check if there is something to commit
//default false
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
final Tuple2
至此如何从kafka中拉取数据,已经介绍完了