`

kafka获得最新partition offset

阅读更多

kafka获得partition下标,需要用到kafka的simpleconsumer

 

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.Map.Entry;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.SimpleConsumer;

public class KafkaOffsetTools {

	public static void main(String[] args) {
		// 读取kafka最新数据
		// Properties props = new Properties();
		// props.put("zookeeper.connect",
		// "192.168.6.18:2181,192.168.6.20:2181,192.168.6.44:2181,192.168.6.237:2181,192.168.6.238:2181/kafka-zk");
		// props.put("zk.connectiontimeout.ms", "1000000");
		// props.put("group.id", "dirk_group");
		//
		// ConsumerConfig consumerConfig = new ConsumerConfig(props);
		// ConsumerConnector connector =
		// Consumer.createJavaConsumerConnector(consumerConfig);

		String topic = "dirkz";
		String seed = "118.26.148.18";
		int port = 9092;
		if (args.length >= 3) {
			topic = args[0];
			seed = args[1];
			port = Integer.valueOf(args[2]);
		}
		List<String> seeds = new ArrayList<String>();
		seeds.add(seed);
		KafkaOffsetTools kot = new KafkaOffsetTools();

		TreeMap<Integer,PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic);
		
		int sum = 0;
		
		for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) {
			int partition = entry.getKey();
			String leadBroker = entry.getValue().leader().host();
			String clientName = "Client_" + topic + "_" + partition;
			SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,
					64 * 1024, clientName);
			long readOffset = getLastOffset(consumer, topic, partition,
					kafka.api.OffsetRequest.LatestTime(), clientName);
			sum += readOffset;
			System.out.println(partition+":"+readOffset);
			if(consumer!=null)consumer.close();
		}
		System.out.println("总和:"+sum);

	}

	public KafkaOffsetTools() {
//		m_replicaBrokers = new ArrayList<String>();
	}

//	private List<String> m_replicaBrokers = new ArrayList<String>();

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
//		long[] offsets2 = response.offsets(topic, 3);
		return offsets[0];
	}

	private TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic) {
		TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup"+new Date().getTime());
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						map.put(part.partitionId(), part);
//						if (part.partitionId() == a_partition) {
//							returnMetaData = part;
//							break loop;
//						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
//		if (returnMetaData != null) {
//			m_replicaBrokers.clear();
//			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
//				m_replicaBrokers.add(replica.host());
//			}
//		}
		return map;
	}

}

 

2
2
分享到:
评论
4 楼 dsxwjhf 2016-11-07  
Good job !!
3 楼 SherJamYu 2016-09-04  
15899231727 写道
新手讨厌不贴整个工程代码的大侠

需要什么工程?maven把库加载了不就行了,还要手把手教hello world吗
2 楼 15899231727 2016-09-01  
新手讨厌不贴整个工程代码的大侠
1 楼 liubey 2015-06-16  
看zk不行吗

相关推荐

    kafka监控工具KafkaOffsetMonitor.7z

    kafka监控工具KafkaOffsetMonitor afkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有...

    kafka-eagle1.4.2

    Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息

    spirngboot对接Kafka示例

    你可以设置消息的键(key)和值(value),并选择性地指定分区(partition)和时间戳(timestamp)。 在消费者方面,你可以编写代码来订阅一个或多个Kafka主题,并实现对接收到的消息的处理逻辑。你可以选择使用注解或编程...

    深入剖析Kafka设计原理:如何构建高效的消息系统

    接着,文档深入探讨了Kafka中重要的机制,包括Partition副本选举、消费者消费消息的Offset记录机制以及消费者Rebalance机制。特别地,对于Kafka的生产者和消费者客户端行为进行了详细分析,包括消息的发布机制、消息...

    kafka监测工具

    KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然

    一个美观简洁且强大的kafka web管理工具

    发送消息(支持向指定的topic和partition发送字符串消息) 延迟消息(通过扩展使kafka支持18个级别的延迟消息) 如您需要在企业网络中使用 kafka-map ,建议先征求 IT 管理员的同意。下载、使用或分发 kafka-map 前...

    非常好的kafka项目资源,分享出来.zip

    发送消息(支持向指定的topic和partition发送字符串消息) 延迟消息(通过扩展使kafka支持18个级别的延迟消息) 截图 添加集群 集群管理 broker 主题管理 消费组 查看消费组已订阅主题 消费组详情 topic详情——分区...

    Apache_Kafka_Share

    Kafka是由LinkedIn公司用Scala语言开发的,一个分布式、分区的、多副本的、多订阅者的,基于Zookeeper协调的分布式日志系统(也可做MQ系统)。主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的...

    python 消费 kafka 数据教程

    pip install --user kafka-python==1.4.3 如果报错压缩相关的错尝试安装下面的依赖 yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4 2.生产者 #!/usr/bin/env python...

    kafka管理工具kafkaOffsetMonitor

    KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。

    Kafka是一个分布式流处理平台,被广泛用于构建实时数据管道,允许你流式地处理数据

    Kafka是一个分布式流处理平台,被广泛用于构建实时数据管道,允许你流式地处理数据。...4.分区(Partition):主题的子集,数据分布于多个分区。 消息(Message):传送的数据。 6.偏移量(Offset):分区中的消息位置。

    快速学习-Kafka架构深入

    topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己...

    看完这篇还不会kafka,我跪榴莲!

    为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partition 的数量来进行横向扩容。单个 Parition 内是保证消息有序。 每新写一条消息,Kafka 就是在对应的文件 append 写,...

    Kafka 笔记

    Kafka 文章目录Kafka架构名次解释Producer(生产者)命令使用脚本常用参数举例分区策略发送返回值幂等性Consumer(消费者)命令使用脚本常用参数举例...Partition:消息分区。消息物理存储上概念 Offset:偏移量 Repli

    python kafka 多线程消费者&手动提交实例

    from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import * from collections import OrderedDict threads = []

    KafkaDemo:kafka Java API

    #Kafka Java API案例Producer可选配置,如果不配置,则使用默认的partitioner根据key值value映射到指定的Parition props... if(offset&gt;0){ partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;}

    KafkaMonitor.zip

    KafkaOffsetMonitor(版本0.2.1)是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都...

    Kafka设计原理

    基本概念:broker:Kafka服务器,负责消息存储和转发topic:消息类别,Kafka按照topic来分类消息partition:topic的分区,一个topic可以包含多个partition,topic消息保存在各个partition上offset:消息在日志中

    Kafka高性能架构之道

    Kafka是一个Pub-Sub的消息系统,无论是发布还是订阅,都...在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。一方面,由于不同Partition可位于不同机器,因此可以

Global site tag (gtag.js) - Google Analytics