topicconfigManager类
主要流程为
1.监控config/change节点,那个topic的config变化了
2.从zk上的topic的config目录,获取最新config信息
3.更新logmanager里指定topic的tplog(每个topic每个partition对应一个log)配置
/** * 注册config change的listener * Begin watching for config changes */ def startup() { ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) //监听/config/changes的子节点,ConfigChangeListener zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) //启动服务,检查是否有topic的config需要更新,使用跟ConfigChangeListener相同的方法processConfigChanges processAllConfigChanges() }
主要方法processConfigChanges
/** * change config topic需要 * 1.设置zk上的topic config; * 2.在zk上添加一个notification,标志哪个topic的config被改变 * Process the given list of config changes */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { info("Processing config change notification(s)...") val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer //group by topic,Buffer[Log] buffer._2 := Log val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) {//changeid是比现在新的 val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) if(jsonOpt.isDefined) { val json = jsonOpt.get val topic = json.substring(1, json.length - 1) // hacky way to dequote if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))//获得最新topic config和default prop的合并值 val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic))//获得当前logmanager对象中所有这个topic的log对象 log.config = logConfig info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) purgeObsoleteNotifications(now, notifications) } } lastExecutedChange = changeId } } } }
相关推荐
SpringBoot整合Kafka配置类方式
java集成kafka单机或kafka集群,并对kafka进行常用的操作,封装成工具方法,亲体整理,没有坑
kafka详细配置
kafka连接工具
kafka kafka kafka kafka kafka
一共包含两个程序,分别是Kafka生产者工具、Kafka消费者工具。 1、使用bootstrap、userName、password连接kafka。 2、可使用text、json格式发送topic消息。 3、异步producer、customer,收发消息畅通无阻。 Kafka...
kafka
kafka封装类,简单封装了kafka的订阅和发布,方便订阅数据
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka kafka kafka
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka java依赖包下载 kafka java依赖包下载 kafka java依赖包下载
kafka 插件
kafka的docker镜像包含了kafka,zookeeper 和kafkamanager,可以通过docker 来load 安装
1、图形化界面可以直观地查看 Kafka 的 Topic 里的内容 2、自由设置 Kafka 数据展示格式 3、使用 Kafka Tool 创建/删除 Topic 4、使用 Kafka Tool 模拟发送 Messages
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
最新的已编译的kafka manager包,解压之后可以直接使用 需要已安装jdk8
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka ...
5、kafka监控工具Kafka-Eagle介绍及使用 网址:https://blog.csdn.net/chenwewi520feng/article/details/130581571 本文主要介绍了kafka监控工具Kafka-Eagle的使用。 本文依赖:kafka、zookeeper部署完成。 本分分为...