你知道Kafka创建Topic这个过程做了哪些事情吗?(附视频)
脚本参数
查看更具体参数
下面只是列出了跟 相关的参数
创建Topic脚本
zk方式(不推荐)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
需要注意的是--zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 带命名空间的这种,不要漏掉了
kafka版本 >= 2.2 支持下面方式(推荐)
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
更多TopicCommand相关命令请看
当前分析的kafka源码版本为
创建Topic 源码分析
温馨提示: 如果阅读源码略显枯燥,你可以直接看源码总结以及后面部分
首先我们找到源码入口处, 查看一下 脚本的内容 最终是执行了这个类,找到这个地方之后就可以断点调试源码了,用IDEA启动 记得配置一下入参 比如:
1. 源码入口
上面的源码主要作用是
2. 创建AdminClientTopicService 对象
2.1 先创建 Admin
object AdminClientTopicService {
def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
bootstrapServer match {
case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
case None =>
}
Admin.create(commandConfig)
}
def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
}
3. AdminClientTopicService.createTopic 创建Topic
case class AdminClientTopicService private (adminClient: Admin) extends TopicService {
override def createTopic(topic: CommandTopicPartition): Unit = {
//如果配置了副本副本数--replication-factor 一定要大于0
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
//如果配置了--partitions 分区数 必须大于0
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")
//查询是否已经存在该Topic
if (!adminClient.listTopics().names().get().contains(topic.name)) {
val newTopic = if (topic.hasReplicaAssignment)
//如果指定了--replica-assignment参数;则按照指定的来分配副本
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
new NewTopic(
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
}
// 将配置--config 解析成一个配置map
val configsMap = topic.configsToAdd.stringPropertyNames()
.asScala
.map(name => name -> topic.configsToAdd.getProperty(name))
.toMap.asJava
newTopic.configs(configsMap)
//调用adminClient创建Topic
val createResult = adminClient.createTopics(Collections.singleton(newTopic))
createResult.all().get()
println(s"Created topic ${topic.name}.")
} else {
throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
}
}
3.1 KafkaAdminClient.createTopics(NewTopic) 创建Topic
@Override
public CreateTopicsResult createTopics(final Collection newTopics,
final CreateTopicsOptions options) {
//省略部分源码...
Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
return new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().
setTopics(topics).
setTimeoutMs(timeoutMs).
setValidateOnly(options.shouldValidateOnly()));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
//省略
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
}
这个代码里面主要看下Call里面的接口; 先不管Kafka如何跟服务端进行通信的细节; 我们主要关注创建Topic的逻辑;
4. 发起网络请求
5. Controller角色的服务端接受请求处理逻辑
首先找到服务端处理客户端请求的 源码入口 ⇒
主要看里面的 方法; 可以看到客户端的请求都在里面
5.1 KafkaApis.handle(request) 根据请求传递Api调用不同接口
进入方法可以看到根据 调用对应的方法,客户端传过来的是
5.2 KafkaApis.handleCreateTopicsRequest 处理创建Topic的请求
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
// 部分代码省略
//如果当前Broker不是属于Controller的话,就抛出异常
if (!controller.isActive) {
createTopicsRequest.data.topics.asScala.foreach { topic =>
results.add(new CreatableTopicResult().setName(topic.name).
setErrorCode(Errors.NOT_CONTROLLER.code))
}
sendResponseCallback(results)
} else {
// 部分代码省略
}
adminManager.createTopics(createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly,
toCreate,
authorizedForDescribeConfigs,
handleCreateTopicsResults)
}
}
5.3 adminManager.createTopics()
创建主题并等等主题完全创建,回调函数将会在超时、错误、或者主题创建完成时触发
该方法过长,省略部分代码
def createTopics(timeout: Int,
validateOnly: Boolean,
toCreate: Map[String, CreatableTopic],
includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
responseCallback: Map[String, ApiError] => Unit): Unit = {
// 1. map over topics creating assignment and calling zookeeper
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
val metadata = toCreate.values.map(topic =>
try {
//省略部分代码
//检查Topic是否存在
//检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用
// 如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的配置(这个Broker肯定是Controller)
// 计算分区副本分配方式
createTopicPolicy match {
case Some(policy) =>
//省略部分代码
adminZkClient.validateTopicCreate(topic.name(), assignments, configs)
if (!validateOnly)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
case None =>
if (validateOnly)
//校验创建topic的参数准确性
adminZkClient.validateTopicCreate(topic.name, assignments, configs)
else
//把topic相关数据写入到zk中
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
}
}
5.4 写入zookeeper数据
我们进入到看看有哪些数据写入到了zk中;
def createTopicWithAssignment(topic: String,
config: Properties,
partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
validateTopicCreate(topic, partitionReplicaAssignment, config)
// 将topic单独的配置写入到zk中
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
// 将topic分区相关信息写入zk中
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
}
源码就不再深入了,这里直接详细说明一下
写入Topic配置信息
这里写入的数据,是我们入参时候传的topic配置; 这里的配置会覆盖默认配置
写入Topic分区副本信息
具体跟zk交互的地方在 这里包装了很多跟zk的交互;
6. Controller监听 , 通知Broker将分区写入磁盘
Controller 有监听zk上的一些节点; 在上面的流程中已经在zk中写入了 ; 这个时候Controller就监听到了这个变化并相应;
private def processTopicChange(): Unit = {
//如果处理的不是Controller角色就返回
if (!isActive) return
//从zk中获取 `/brokers/topics 所有Topic
val topics = zkClient.getAllTopicsInCluster
//找出哪些是新增的
val newTopics = topics -- controllerContext.allTopics
//找出哪些Topic在zk上被删除了
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
deletedTopics.foreach(controllerContext.removeTopic)
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
6.1 onNewPartitionCreation 状态流转
/**
* This callback is invoked by the topic change callback with the list of failed brokers as input.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
*/
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
info(s"New partition creation callback for ${newPartitions.mkString(",")}")
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
partitionStateMachine.handleStateChanges(
newPartitions.toSeq,
OnlinePartition,
Some(OfflinePartitionLeaderElectionStrategy(false))
)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}
7. Broker收到LeaderAndIsrRequest 创建本地Log
上面步骤中有说到向副本所属Broker发送请求,那么这里做了什么呢 其实主要做的是 创建本地Log
代码太多,这里我们直接定位到只跟创建Topic相关的关键代码来分析
/**
* 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出 KafkaStorageException
*/
def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
logCreationOrDeletionLock synchronized {
getLog(topicPartition, isFuture).getOrElse {
// create the log if it has not already been created in another thread
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
val logDirs: List[File] = {
val preferredLogDir = preferredLogDirs.get(topicPartition)
if (isFuture) {
if (preferredLogDir == null)
throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
}
if (preferredLogDir != null)
List(new File(preferredLogDir))
else
nextLogDirs()
}
val logDirName = {
if (isFuture)
Log.logFutureDirName(topicPartition)
else
Log.logDirName(topicPartition)
}
val logDir = logDirs
.toStream // to prevent actually mapping the whole list, lazy map
.map(createLogDirectory(_, logDirName))
.find(_.isSuccess)
.getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
.get // If Failure, will throw
val log = Log(
dir = logDir,
config = config,
logStartOffset = 0L,
recoveryPoint = 0L,
maxProducerIdExpirationMs = maxPidExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel)
if (isFuture)
futureLogs.put(topicPartition, log)
else
currentLogs.put(topicPartition, log)
info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.")
// Remove the preferred log dir since it has already been satisfied
preferredLogDirs.remove(topicPartition)
log
}
}
}
详细请看 【kafka源码】LeaderAndIsrRequest请求
源码总结
如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述
Q&A
创建Topic的时候 在Zk上创建了哪些节点
接受客户端请求阶段:
Controller监听zk节点变更阶段
创建Topic的时候 什么时候在Broker磁盘上创建的日志文件
当Controller监听zk节点变更之后,将新增的Topic 解析好的分区状态流转 ->-> 当流转到的时候会像分区分配到的Broker发送一个请求,当Broker们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地Log;如果没有的话就重新创建;
如果我没有指定分区数或者副本数,那么会如何创建
我们都知道,如果我们没有指定分区数或者副本数, 则默认使用Broker的配置, 那么这么多Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建topic的过程,就是使用谁的配置; 所以是谁执行的? 那肯定是Controller啊! 上面的源码我们分析到了,创建的过程,会指定Controller这台机器去进行;
如果我手动删除了下的某个节点会怎么样?
详情请看 【kafka实战】一不小心删除了下的某个Topic
如果我手动在zk中添加节点会怎么样
先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建Topic的请求,本质上是去zk里面写两个数据
下面不妨让我们来验证一下; 创建一个节点 节点数据为下面数据;
{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}
这里我用的工具手动创建的,你也可以用命令行创建; 创建完成之后我们再看看本地有没有生成一个Log文件 可以看到我们指定的Broker,已经生成了对应的分区副本Log文件; 而且zk中也写入了其他的数据
如果写入节点之后Controller挂掉了会怎么样
先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去
然后我们来模拟这么一个过程,先停止集群,然后再zk中写入节点数据; 然后再启动一台Broker; 源码分析: 我们之前分析过Controller的启动过程与选举 有提到过,这里再提一下Controller当选之后有一个地方处理这个事情
replicaStateMachine.startup() partitionStateMachine.startup()
启动状态机的过程是不是跟上面的6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了; 伴随着是不发起了请求; 是不是Broker收到请求之后,创建本地Log文件了
附件
--config 可生效参数
请以 为准
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
Tips:如果关于本篇文章你有疑问,可以在公众号给我留言
PS: 文章阅读的源码版本是kafka-2.5