Bootstrap

Flink源码分析之Flink startupMode是如何起作用的

之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put("auto.offset.reset", "earliest")同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下

@Override
	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

		// create the kafka partition discoverer
		this.partitionDiscoverer = createPartitionDiscoverer(
				topicsDescriptor,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks());
		this.partitionDiscoverer.open();

		subscribedPartitionsToStartOffsets = new HashMap<>();
		//获取fixed topic's or topic pattern 's   partitions of this subtask
		final List allPartitions = partitionDiscoverer.discoverPartitions();
		//从checkpoint中恢复
		if (restoredState != null) {
			for (KafkaTopicPartition partition : allPartitions) {
				//新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets
				if (!restoredState.containsKey(partition)) {
					restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
				}
			}

			for (Map.Entry restoredStateEntry : restoredState.entrySet()) {
				if (!restoredFromOldState) {
					// seed the partition discoverer with the union state while filtering out
					// restored partitions that should not be subscribed by this subtask
					if (KafkaTopicPartitionAssigner.assign(
						restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
							== getRuntimeContext().getIndexOfThisSubtask()){
						subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
					}
				} else {
					// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
					// in this case, just use the restored state as the subscribed partitions
					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
				}
			}

			if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
				subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
					if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
						LOG.warn(
							"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
							entry.getKey());
						return true;
					}
					return false;
				});
			}

			LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
		} else {
			// use the partition discoverer to fetch the initial seed partitions,
			// and set their initial offsets depending on the startup mode.
			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
			// when the partition is actually read.
			switch (startupMode) {
				case SPECIFIC_OFFSETS:
					if (specificStartupOffsets == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
								", but no specific offsets were specified.");
					}

					for (KafkaTopicPartition seedPartition : allPartitions) {
						//指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始
						Long specificOffset = specificStartupOffsets.get(seedPartition);
						if (specificOffset != null) {
							// since the specified offsets represent the next record to read, we subtract
							// it by one so that the initial state of the consumer will be correct
							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
						} else {
							// default to group offset behaviour if the user-provided specific offsets
							// do not contain a value for this partition
						//对应的startupMode也存储到	subscribedPartitionsToStartOffsets中
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
						}
					}

					break;
				case TIMESTAMP:
					if (startupOffsetsTimestamp == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
								", but no startup timestamp was specified.");
					}

					for (Map.Entry partitionToOffset
							: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
						subscribedPartitionsToStartOffsets.put(
							partitionToOffset.getKey(),
							(partitionToOffset.getValue() == null)
									// if an offset cannot be retrieved for a partition with the given timestamp,
									// we default to using the latest offset for the partition
									? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
									// since the specified offsets represent the next record to read, we subtract
									// it by one so that the initial state of the consumer will be correct
									: partitionToOffset.getValue() - 1);
					}

					break;
				default:
					//默认GROUP_OFFSETS
					for (KafkaTopicPartition seedPartition : allPartitions) {
						subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
					}
			}

			if (!subscribedPartitionsToStartOffsets.isEmpty()) {
				switch (startupMode) {
					case EARLIEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case LATEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case TIMESTAMP:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							startupOffsetsTimestamp,
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case SPECIFIC_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							specificStartupOffsets,
							subscribedPartitionsToStartOffsets.keySet());

						List partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
						for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
							if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
								partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
							}
						}

						if (partitionsDefaultedToGroupOffsets.size() > 0) {
							LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
									"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
								getRuntimeContext().getIndexOfThisSubtask(),
								partitionsDefaultedToGroupOffsets.size(),
								partitionsDefaultedToGroupOffsets);
						}
						break;
					case GROUP_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
				}
			} else {
				LOG.info("Consumer subtask {} initially has no partitions to read from.",
					getRuntimeContext().getIndexOfThisSubtask());
			}
		}

open方法主要是将user指定的topic和对应的partition、offset,存储到Map subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法

@Override
	//入口方法 start a source
	public void run(SourceContext sourceContext) throws Exception {
		if (subscribedPartitionsToStartOffsets == null) {
			throw new Exception("The partitions were not set for the consumer");
		}

		// initialize commit metrics and default offset callback method
		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

		this.offsetCommitCallback = new KafkaCommitCallback() {
			@Override
			public void onSuccess() {
				successfulCommits.inc();
			}

			@Override
			public void onException(Throwable cause) {
				LOG.warn("Async Kafka commit failed.", cause);
				failedCommits.inc();
			}
		};

		// mark the subtask as temporarily idle if there are no initial seed partitions;
		// once this subtask discovers some partitions and starts collecting records, the subtask's
		// status will automatically be triggered back to be active.
		if (subscribedPartitionsToStartOffsets.isEmpty()) {
			sourceContext.markAsTemporarilyIdle();
		}

		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				periodicWatermarkAssigner,
				punctuatedWatermarkAssigner,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

		if (!running) {
			return;
		}

		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
			kafkaFetcher.runFetchLoop();
		} else {
			runWithPartitionDiscovery();
		}
	}

createFetcher传入了刚才的subscribedPartitionsToStartOffsets,继续往下走,在创建KafkaFetcher对象的时候,作为构造函数的,最后传到了AbstractFetcher构造器

	protected AbstractFetcher(
			SourceContext sourceContext,
			Map seedPartitionsWithInitialOffsets,
			SerializedValue> watermarksPeriodic,
			SerializedValue> watermarksPunctuated,
			ProcessingTimeService processingTimeProvider,
			long autoWatermarkInterval,
			ClassLoader userCodeClassLoader,
			MetricGroup consumerMetricGroup,
			boolean useMetrics) throws Exception {
		this.sourceContext = checkNotNull(sourceContext);
		this.checkpointLock = sourceContext.getCheckpointLock();
		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);

		this.useMetrics = useMetrics;
		this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
		this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
		this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);

		// figure out what we watermark mode we will be using
		this.watermarksPeriodic = watermarksPeriodic;
		this.watermarksPunctuated = watermarksPunctuated;

		if (watermarksPeriodic == null) {
			if (watermarksPunctuated == null) {
				// simple case, no watermarks involved
				timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
			} else {
				timestampWatermarkMode = PUNCTUATED_WATERMARKS;
			}
		} else {
			if (watermarksPunctuated == null) {
				timestampWatermarkMode = PERIODIC_WATERMARKS;
			} else {
				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
			}
		}

		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

		// initialize subscribed partition states with seed partitions,根据有无timestamp / watermark 
//subscribedPartitionStates 持有了List>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息
		this.subscribedPartitionStates = createPartitionStateHolders(
				seedPartitionsWithInitialOffsets,
				timestampWatermarkMode,
				watermarksPeriodic,
				watermarksPunctuated,
				userCodeClassLoader);

		// check that all seed partition states have a defined offset
		//无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset
		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
			if (!partitionState.isOffsetDefined()) {
				throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
			}
		}

		// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
		//到目前为止consumer并未指定partition
		for (KafkaTopicPartitionState partition : subscribedPartitionStates) {
			unassignedPartitionsQueue.add(partition);
		}

		// register metrics for the initial seed partitions
		if (useMetrics) {
			registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
		}

		// if we have periodic watermarks, kick off the interval scheduler
		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
			@SuppressWarnings("unchecked")
			PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
					subscribedPartitionStates,
					sourceContext,
					processingTimeProvider,
					autoWatermarkInterval);

			periodicEmitter.start();
		}
	}

然后从AbstractFetch的子类KafkaFetch的构造器我们可以知道,unassignedPartitionsQueue又传递给了KafkaConsumerThread

public KafkaFetcher(
		SourceFunction.SourceContext sourceContext,
		Map assignedPartitionsWithInitialOffsets,
		SerializedValue> watermarksPeriodic,
		SerializedValue> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		ClassLoader userCodeClassLoader,
		String taskNameWithSubtasks,
		KafkaDeserializationSchema deserializer,
		Properties kafkaProperties,
		long pollTimeout,
		MetricGroup subtaskMetricGroup,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
		super(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			processingTimeProvider,
			autoWatermarkInterval,
			userCodeClassLoader,
			consumerMetricGroup,
			useMetrics);

		this.deserializer = deserializer;
		this.handover = new Handover();

		this.consumerThread = new KafkaConsumerThread(
			LOG,
			handover,
			kafkaProperties,
			unassignedPartitionsQueue,
			getFetcherName() + " for " + taskNameWithSubtasks,
			pollTimeout,
			useMetrics,
			consumerMetricGroup,
			subtaskMetricGroup);
	}

当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

......
try {
					//hasAssignedPartitions default false
					//当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets
					if (hasAssignedPartitions) {
						newPartitions = unassignedPartitionsQueue.pollBatch();
					}
					else {
						// if no assigned partitions block until we get at least one
						// instead of hot spinning this loop. We rely on a fact that
						// unassignedPartitionsQueue will be closed on a shutdown, so
						// we don't block indefinitely
						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
					}
//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法
					if (newPartitions != null) {
						reassignPartitions(newPartitions);
					}
				} catch (AbortedReassignmentException e) {
					continue;
				}
......

void reassignPartitions(List> newPartitions) throws Exception {

		if (newPartitions.size() == 0) {

			return;

		}

		hasAssignedPartitions = true;

		boolean reassignmentStarted = false;

		// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,

		// the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()

		// until the reassignment is complete.

		final KafkaConsumer consumerTmp;

		synchronized (consumerReassignmentLock) {

//将consumer的引用赋值给consumerTmp

			consumerTmp = this.consumer;

			this.consumer = null;

		}

		final Map oldPartitionAssignmentsToPosition = new HashMap<>();

		try {

/* 之所有会有newPartition和oldPartition是因为当我们配置了KEYPARTITIONDISCOVERYINTERVALMILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中

*/

			for (TopicPartition oldPartition : consumerTmp.assignment()) {

				oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));

			}

			final List newPartitionAssignments =

				new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());

			newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());

			newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

			// reassign with the new partitions

			consumerTmp.assign(newPartitionAssignments);

			reassignmentStarted = true;

			// old partitions should be seeked to their previous position

			for (Map.Entry oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {

				consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());

			}

			// offsets in the state of new partitions may still be placeholder sentinel values if we are:

			//   (1) starting fresh,

			//   (2) checkpoint / savepoint state we were restored with had not completely

			//       been replaced with actual offset values yet, or

			//   (3) the partition was newly discovered after startup;

			// replace those with actual offsets, according to what the sentinel value represent.

			

			//kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode

//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode

			for (KafkaTopicPartitionState newPartitionState : newPartitions) {

				if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {

					consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));

					newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);

				} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {

					consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));

					newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);

				} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {

					// the KafkaConsumer by default will automatically seek the consumer position

					// to the committed group offset, so we do not need to do it.

					newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);

				} else {

					consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);

				}

			}

		} catch (WakeupException e) {

			// a WakeupException may be thrown if the consumer was invoked wakeup()

			// before it was isolated for the reassignment. In this case, we abort the

			// reassignment and just re-expose the original consumer.

			synchronized (consumerReassignmentLock) {

				this.consumer = consumerTmp;

				// if reassignment had already started and affected the consumer,

				// we do a full roll back so that it is as if it was left untouched

				if (reassignmentStarted) {

					this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));

					for (Map.Entry oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {

						this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());

					}

				}

				// no need to restore the wakeup state in this case,

				// since only the last wakeup call is effective anyways

				hasBufferedWakeup = false;

				// re-add all new partitions back to the unassigned partitions queue to be picked up again

				for (KafkaTopicPartitionState newPartition : newPartitions) {

					unassignedPartitionsQueue.add(newPartition);

				}

				// this signals the main fetch loop to continue through the loop

				throw new AbortedReassignmentException();

			}

		}

		// reassignment complete; expose the reassigned consumer

		synchronized (consumerReassignmentLock) {

			this.consumer = consumerTmp;

			// restore wakeup state for the consumer if necessary

			if (hasBufferedWakeup) {

				this.consumer.wakeup();

				hasBufferedWakeup = false;

			}

		}

	}