博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka监控及管理
阅读量:6120 次
发布时间:2019-06-21

本文共 23154 字,大约阅读时间需要 77 分钟。

hot3.png

kafka监控及管理


1. kafka监控

kafka自身没有监控管理页面,无论是进行一些管理操作还是状态的监控都要命令加一大堆记不住的参数,实在是很不方便,不过好在在github上开源了一些工具,在kafka的中也有提及到:

  • Kafka Manager: 都是以表格的形式展现数据,比较方便用来管理kafka,例如topic的创建、删除以及分区的管理等。
  • Kafka Offset Monitor: 监控消费者以及所在分区的offset,帮助分析当前的消费以及生产是否顺畅,功能比较单调但界面还可以。
  • Kafka Web Console:上已经说明了不再更新了,且建议使用Kafka manager
  • kafkat: 一个简化了的命令行工具,用来管理kafka的broker,partition,topic.
  • Capillary: 如果是kafka+storm集成使用,可以选择该工具,是一个web应用

除了前面两个,其它几个都没试用过,就算在网上查也是推荐前两个而已,kafka manager基于jmx功能比较强大,利用它做管理方面;而KafkaOffsetMonitor从它的启动参数来看应该是定时从zookeeper上获取消费者的offset,以图的形式展示,比较直观(对于一些实现Exactly once的系统,offset并不保存在zookeeper上面,它将不能使用),两者结合使用,相得益彰。

2.1. Kafka-manager

kafka manager的源码: 

官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不过使用kafka_0.10.1.0时也能正常。 
版本要求:8+

2.1.1 编译源码

为了得到部署包kafka-manager-xxxx.zip,先根据上面的地址下载源码再编译(不想这么麻烦的话,可以去一些kafka的qq群,一般群共享里都会有这个包)。kafka-manager工程是利用SBT进行构建的,所以编译之前还需要安装SBT,安装8。最后执行命令编译:

sbt clean dist

网络不好的话可能需要重复编译,成功后在target/universal目录下可以看到kafka-manager-1.3.2.1.zip

2.1.2 修改配置

把编译得到的zip包解压,在conf目录中有一个application.conf文件,最小化的配置只需要在该文件中修改kafka-manager.zkhosts参数即可:

kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"

2.1.3 kafka启动jmx

kafka服务必需要开启JMX,否则在下一步启动kafka-manager时会出现: 

java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled! 
启动kafka服务时指定JMX_PORT值:

JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties

或者修改kafka-server-start.sh,在前面加上:

export JMX_PORT=9999

2.1.4 启动kafka-manager

以运行在上为例,启动脚本为bin/kafkak-manager,该脚本会默认占用9000端口,也可以通过参数修改端口以及指定java版本运行:

nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &

启动成功后,即可以通过来访问

2.2. KafkaOffsetMonitor

源码地址: 

jar包下载: 
KafkaOffsetMonitor使用比较方便,将会被打成一个jar包,直接运行即可,作者已经把打好的包上传到github上面,执行以下命令启动:

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar  com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds  --retain 2.days

启动成功后,即可以通过来访问

3. kafka自带脚本工具

基本所有脚本都是调用kafka-run-class.sh脚本去执行一个命令模式的class.建议使用脚本时参考脚本的使用说明。

3.1. 主题管理:kafka-topics.sh

创建、删除、查看或者改变一个topic.

3.1.1. 创建topic

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1

创建一个名称为test的topic,它有3个分区,每个分区两个replica,通过**–config给topic设置属性,格式为key=value,如果有多个配置属性则如上命令。这种创建方式kafka会自动把各个分区的replica分配到相应的broker,也可以在创建topic时手动指定**哪个分区的哪个replica落在指定的broker,示例命令如下:

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test  --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2

关键的配置参数为–replica-assignment,该参数不能与–partitions和–replication-factor同时出现,参数的使用格式如下:

broker_id_for_part0_replica1: broker_id_for_part0_replica2, 

broker_id_for_part1_replica1: broker_id_for_part1_replica2, 
broker_id_for_part2_replica1: broker_id_for_part2_replica2

–replica-assignment 0:1,1:2表示有两个分区,分区0的replica1在broker.id=0的kafka服务上,分区0的replica2在broker.id=1的kafka服务上;分区1的replica1在broker.id=1的kafka服务上,分区1的replica2在broker.id=2的kafka服务上。

3.1.2. 删除topic

当使用delete命令删除topic,默认只是进行标记,并没有真正的删除

Note: This will have no impact if delete.topic.enable is not set to true. 

需要在config/server.properties配置文件中开启delete.topic.enable=true

3.1.3. 查看topic

bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test

describe名称为test的topic,将会显示topic的分区数、replica因子、配置参数以及各分区的分布情况(leader,replica,isr),如下图:

使用–describe时还可以结合其它一些参数对结果进行过滤:

  • topics-with-overrides:加上该参数时,只显示config被覆盖过的topic(例如使用下面的–alter修改config,或者创建topic时指定–config也算)。
  • unavailable-partitions:加上该参数时,只显示leader不可用的分区。
  • under-replicated-partitions:加上该参数时,只显示replica不足的分区。

3.1.4. 修改topic

根据–alter参数的说明,可以修改topic的分区数(目前只能是增加),修改配置config,以及修改replica(这里貌似不准确,根据官网的文档说明,如果想要增加replication factor,应该使用kafka-reassign-partitions.sh脚本)。

  • 增加分区:从2个分区增加到3个
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3

成功后describe一下topic:

添加分区不能改变现有的数据

  • 添加修改配置 
    根据上图test主题的Configs:flush.messages=1,flush.ms=1000,尝试把flush.ms修改为2000,命令如下:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000

成功后:

 

  • 删除配置 
    –alter结合–delete-config一起使用可以删除某项配置,尝试删除flush.ms配置项目:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms

成功后: 

PS:对于使用–alter增加、修改和删除config,从0.9.0.0版本后建议使用kafka-configs.sh脚本。

3.2. 配置管理:kafka-configs.sh

这个脚本专门是用来添加,修改和删除实体的config的,其中操作的实体对象有:topic, client, user 和 broker。

3.2.1. 更新配置

添加和修改都可以统一说成更新,没有则添加,存在即修改。结合alter,add-config以及其它一些配置,例如修改broker的某个config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
  • entity-type:实体类型,必须为(topics/clients/users/brokers)其中之一。
  • entity-name:实体名称(topic的名称,client的id,user的principal名称,broker的id)。
  • add-config:格式为’ key1:value1,key2:[v21,v22],key3:value3’,多个配置可以写在一起,比kafka-topic.sh的修改更人性化。

3.2.2. 查看配置

执行上面命令给id=0的broker添加配置leader.replication.throttled.rate后,接着查看一下该broker的config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe

结果: 

3.2.3. 删除配置

接上,删除id=0的broker的配置leader.replication.throttled.rate

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'

结果: 

3.3. 消费者组管理:kafka-consumer-groups.sh

可以列出所有消费者组,查看某个消费者组的详细情况以及删除消费者组的信息(删除只适用于旧版本基于zookeeper的消费都组)。

3.3.1. 列出消费者组

Kafka默认一直会有一个“KafkaManagerOffsetCache”的消费者组,为了更加直观,先用kafka-console-consumer.sh启动一个消费都,并加入一个叫做“test_group”的组:

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group

接着使用以下命令列出所有的消费都组:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list

已经可以看到“test_group”的消费都组了: 

3.3.2. 查看消费者组

查看消费者组的具体消费状况,结合来分析目前集群的稳健程度,执行以下命令:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
  • 结果: 

3.4. 均衡leader:kafka-preferred-replica-election.sh

每个分区的所有replicas叫做”assigned replicas”,”assigned replicas”中的第一个replicas叫”preferred replica”,刚创建的topic一般”preferred replica”(优先副本)是leader。 

各分区的读取写请求都是由leader来接收处理的,那么当然希望各分区的leader可以均衡地分布在各个broker之上,做到均衡负载,提高集群稳定性以及充分利用资源。一般在创建topic时,kafka都会默认把leader平均分配,但当某个broker宕掉后,会导致该broker上的leader转移到其它的broker上去,导致机群的负载不均衡,就算宕掉的broker恢复正常,它上面已经没有leader。可以使用kafka-preferred-replica-election.sh工具令到恢复后的broker上的优先副本重新选举成为leader,这样又恢复到了宕掉之前的状态。

下面来模拟一下整个过程,首先创建一个topic,3个分区,每个分区的leader分别在3个broker上面:

分区0的leader已经从broker0转移到了broker1了,在Isr中也看不到原本broker0的两个replica。最后重新启动broker0并执行以下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181

再观察test的分区情况: 

可以看到test已经恢复到最初的leader分布情况了。默认是对所有分区进行优先副本选举(preferred replica election),如果想指定操作某些分区,则需要配合–path-to-json-file参数,例如test有0,1,2三个分区,只想操作1,2分区,首先编译test_election.json文件,内容如下:

{“partitions”:[{“topic”: “test”, “partition”: 1}, {“topic”: “test”, 

“partition”: 2}]}

然后执行以下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json

PS:其实可以配置kafka自动平衡leader的,在server.properties文件中设置:**auto.leader.rebalance.enable=true**即可,而该配置默认是已经打开的,想验证的话可以重启一个broker,隔一段时间后会发现leader会自动恢复。

3.5. 扩容重分区:kafka-reassign-partitions.sh

当有新的broker节点加入到已经在使用的集群,kafka是不会自动均衡原本的数据到新节点的,需要手动对分区进行迁移,使得新节点可以对外提供服务。(对于后来创建和topic则不需要)。

3.5.1. 生成迁移计划

首先肯定要知道需要对哪些topic进行迁移,且明确哪个分区需要迁移到哪个broker节点。现有一个分区test,具体如下图:

手动编辑一个json文件(例如命名为topics-to-move.json),表示哪些topic是需要迁移的,内容如下(可以指定多个topics):

{“topics”: [{“topic”: “test”}], “version”:1 }

想要把test的所有分区迁移到broker1,2,执行以下命令生成迁移计划:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
  • topics-to-move-json-file:上面创建的topics-to-move.js文件

  • broker-list:分区需要迁移到的broker,格式如命令

  • generate:表示生成分配规则,内容是json串 

    执行结果如下:

显示了当前的分配规则(可以用作回滚)以及新生成的分配规则,把内容保存到文件(expand-cluster-reassignment.json),当然,也可以手动修改里面的内容,只要符合格式即可:

{“version”:1,”partitions”:[{“topic”:”test”,”partition”:1,”replicas”:[2,1]},{“topic”:”test”,”partition”:2,”replicas”:[1,2]},{“topic”:”test”,”partition”:0,”replicas”:[1,2]}]}

3.5.2. 执行迁移计划

根据上一步生成的分配规则expand-cluster-reassignment.json启动迁移,执行以下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

然后describe一下,查看新的分区分配情况:

可以看到现在所有分区的replica都已经全部迁移到了broker1,2上面。

3.5.3. 验证迁移计划

还是根据分配规则expand-cluster-reassignment.json验证分区是否分配成功,执行以下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

执行结果如下: 

至此,分区的迁移已经完成。其实已经对分区规则熟悉的话,可以跳过生成迁移计划这步,直接编写expand-cluster-reassignment.json,然后执行验证。

3.6. 增加副本:kafka-reassign-partitions.sh

为分区增加副本,还是使用kafka-reassign-partitions.sh命令,然后编辑副本规则json文件即可。现有以下topic:

分区0有两个replica,分别在broker1,2上,现在准备在broker0上添加一个replica,先创建副本分配json文件(increase-replication-factor.json),内容如下:

{“version”:1, 

“partitions”:[{“topic”:”test”,”partition”:0,”replicas”:[0,1,2]}]}

然后指定increase-replication-factor.json执行下面的命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute

接着,同样使用increase-replication-factor.json来验证是否成功:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

执行结果如下: 

或者describe一下topic:

4.1. Kafka管理工具介绍

内部提供了许多管理脚本,这些脚本都放在$KAFKA_HOME/bin目录下,而这些类的实现都是放在源码的kafka/core/src/main/[Scala](http://lib.csdn.net/base/scala)/kafka/tools/路径下。

Consumer Offset Checker

  Consumer Offset Checker主要是运行kafka.tools.ConsumerOffsetChecker类,对应的脚本是kafka-consumer-offset-checker.sh,会显示出Consumer的Group、Topic、分区ID、分区对应已经消费的Offset、logSize大小,Lag以及Owner等信息。

如果运行kafka-consumer-offset-checker.sh脚本的时候什么信息都不输入,那么会显示以下信息:

[iteblog@www.iteblog.com /\]$ bin/kafka-consumer-offset-checker.shCheck the offset of your consumers.Option                                  Description                            -----\-                                  ----------\-                            --broker-info                           Print broker info                      --group                                 Consumer group.                        --help                                  Print this message.                    --retry.backoff.ms 
Retry back-off to use for failed offset queries. (default: 3000) --socket.timeout.ms
Socket timeout to use when querying for offsets. (default: 6000) --topic Comma-separated list of consumer topics (all topics if absent). --zookeeper ZooKeeper connect string. (default: localhost:2181)

我们根据提示,输入的命令如下:

[iteblog@www.iteblog.com /\]$ bin/kafka-consumer-offset-checker.sh --zookeeper www.iteblog.com:2181 --topic test --group spark --broker-infoGroup           Topic      Pid Offset          logSize         Lag             Ownerspark    test       0   34666914        34674392        7478            nonespark    test       1   34670481        34678029        7548            nonespark    test       2   34670547        34678002        7455            nonespark    test       3   34664512        34671961        7449            nonespark    test       4   34680143        34687562        7419            nonespark    test       5   34672309        34679823        7514            nonespark    test       6   34674660        34682220        7560            noneBROKER INFO2 -> www.iteblog.com:90925 -> www.iteblog.com:90934 -> www.iteblog.com:90947 -> www.iteblog.com:90951 -> www.iteblog.com:90963 -> www.iteblog.com:90976 -> www.iteblog.com:9098

4.2.Dump Log Segment

  有时候我们需要验证日志索引是否正确,或者仅仅想从log文件中直接打印消息,我们可以使用kafka.tools.DumpLogSegments类来实现,先来看看它需要的参数:

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.Option                                  Description                            -----\-                                  ----------\-                            --deep-iteration                        if set, uses deep instead of shallow                                             iteration                            --files 
REQUIRED: The comma separated list of data and index log files to be dumped--key-decoder-class if set, used to deserialize the keys. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka. serializer.StringDecoder) --max-message-size
Size of largest message. (default: 5242880) --print-data-log if set, printing the messages content when dumping data logs --value-decoder-class if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder) --verify-index-only if set, just verify the index log without printing its content

  很明显,我们在使用kafka.tools.DumpLogSegments的时候必须输入--files,这个参数指的就是中Topic分区所在的绝对路径。分区所在的目录由config/server.properties文件中log.dirs参数决定。比如我们想看/home/q/kafka/kafka_2.10-0.8.2.1/data/test-4/00000000000034245135.log日志文件的相关情况可以 使用下面的命令:

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /iteblog/data/test-4/00000000000034245135.logDumping /home/q/kafka/kafka_2.10-0.8.2.1/data/test-4/00000000000034245135.logStarting offset: 34245135offset: 34245135 position: 0 isvalid: true payloadsize: 4213 magic: 0 compresscodec: NoCompressionCodec crc: 865449274 keysize: 4213offset: 34245136 position: 8452 isvalid: true payloadsize: 4657 magic: 0 compresscodec: NoCompressionCodec crc: 4123037760 keysize: 4657offset: 34245137 position: 17792 isvalid: true payloadsize: 3921 magic: 0 compresscodec: NoCompressionCodec crc: 541297511 keysize: 3921offset: 34245138 position: 25660 isvalid: true payloadsize: 2290 magic: 0 compresscodec: NoCompressionCodec crc: 1346104996 keysize: 2290offset: 34245139 position: 30266 isvalid: true payloadsize: 2284 magic: 0 compresscodec: NoCompressionCodec crc: 1930558677 keysize: 2284offset: 34245140 position: 34860 isvalid: true payloadsize: 268 magic: 0 compresscodec: NoCompressionCodec crc: 57847488 keysize: 268offset: 34245141 position: 35422 isvalid: true payloadsize: 263 magic: 0 compresscodec: NoCompressionCodec crc: 2964399224 keysize: 263offset: 34245142 position: 35974 isvalid: true payloadsize: 1875 magic: 0 compresscodec: NoCompressionCodec crc: 647039113 keysize: 1875offset: 34245143 position: 39750 isvalid: true payloadsize: 648 magic: 0 compresscodec: NoCompressionCodec crc: 865445580 keysize: 648offset: 34245144 position: 41072 isvalid: true payloadsize: 556 magic: 0 compresscodec: NoCompressionCodec crc: 1174686061 keysize: 556offset: 34245145 position: 42210 isvalid: true payloadsize: 4211 magic: 0 compresscodec: NoCompressionCodec crc: 3691302513 keysize: 4211offset: 34245146 position: 50658 isvalid: true payloadsize: 2299 magic: 0 compresscodec: NoCompressionCodec crc: 2367114411 keysize: 2299offset: 34245147 position: 55282 isvalid: true payloadsize: 642 magic: 0 compresscodec: NoCompressionCodec crc: 4122061921 keysize: 642offset: 34245148 position: 56592 isvalid: true payloadsize: 4211 magic: 0 compresscodec: NoCompressionCodec crc: 3257991653 keysize: 4211offset: 34245149 position: 65040 isvalid: true payloadsize: 2278 magic: 0 compresscodec: NoCompressionCodec crc: 2103489307 keysize: 2278offset: 34245150 position: 69622 isvalid: true payloadsize: 269 magic: 0 compresscodec: NoCompressionCodec crc: 792857391 keysize: 269offset: 34245151 position: 70186 isvalid: true payloadsize: 640 magic: 0 compresscodec: NoCompressionCodec crc: 791599616 keysize: 640

可以看出,这个命令将Kafka中Message中Header的相关信息和偏移量都显示出来了,但是没有看到日志的内容,我们可以通过--print-data-log来设置。如果需要查看多个日志文件,可以以逗号分割。

4.3. 导出Zookeeper中Group相关的偏移量

  有时候我们需要导出某个Consumer group各个分区的偏移量,我们可以通过使用Kafka的kafka.tools.ExportZkOffsets类来满足。来看看这个类需要的参数:

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.ExportZkOffsetsExport consumer offsets to an output file.Option                                  Description                            -----\-                                  ----------\-                            --group                                 Consumer group.                        --help                                  Print this message.                    --output-file                           Output file                            --zkconnect                             ZooKeeper connect string. (default:                                              localhost:2181)

我们需要输入Consumer group,Zookeeper的地址以及保存文件路径:

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group spark --zkconnect www.iteblog.com:2181 --output-file ~/offset[iteblog@www.iteblog.com /\]$ vim ~/offset /consumers/spark/offsets/test/3:34846274/consumers/spark/offsets/test/2:34852378/consumers/spark/offsets/test/1:34852360/consumers/spark/offsets/test/0:34848170/consumers/spark/offsets/test/6:34857010/consumers/spark/offsets/test/5:34854268/consumers/spark/offsets/test/4:34861572

注意,--output-file参数必须在指定,否则会出错。

4.4 .通过JMX获取metrics信息

  我们可以通过kafka.tools.JmxTool类打印出Kafka相关的metrics信息。

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.JmxToolDump JMX values to standard output.Option                                  Description                            -----\-                                  ----------\-                            --attributes 
The whitelist of attributes to query. This is a comma-separated list. If no attributes are specified all objects will be queried. --date-format
The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. --help Print usage information. --jmx-url
The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details. (default: service:jmx:rmi:///jndi/rmi://: 9999/jmxrmi) --object-name
A JMX object name to use as a query. This can contain wild cards, and this option can be given multiple times to specify more than one query. If no objects are specified all objects will be queried. --reporting-interval
Interval in MS with which to poll jmx stats. (default: 2000)

可以这么使用

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://www.iteblog.com:1099/jmxrmi

运行上面命令前提是在启动kafka集群的时候指定export JMX_PORT= ,这样才会开启JMX。然后就可以通过上面命令打印出Kafka所有的metrics信息。

4.5. Kafka数据迁移工具

  这个工具主要有两个:kafka.tools.KafkaMigrationToolkafka.tools.MirrorMaker。第一个主要是用于将Kafka 0.7上面的数据迁移到Kafka 0.8(

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool --kafka.07.jar kafka-0.7.19.jar --zkclient.01.jar zkclient-0.2.0.jar --num.producers 16 --consumer.config=sourceCluster2Consumer.config --producer.config=targetClusterProducer.config --whitelist=.*\[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

4.6. 日志重放工具

  这个工具主要作用是从一个Kafka集群里面读取指定Topic的消息,并将这些消息发送到其他集群的指定topic中:

[iteblog@www.iteblog.com /\]$ bin/kafka-replay-log-producer.sh Missing required argument "\[broker-list\]"Option                                  Description                            -----\-                                  ----------\-                            --broker-list 
REQUIRED: the broker list must be specified. --inputtopic
REQUIRED: The topic to consume from. --messages
The number of messages to send. (default: -1) --outputtopic
REQUIRED: The topic to produce to --property
A mechanism to pass properties in the form key=value to the producer. This allows the user to override producer properties that are not exposed by the existing command line arguments --reporting-interval
Interval at which to print progress info. (default: 5000) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --threads
Number of sending threads. (default: 1)--zookeeper
REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. (default: 127.0.0.1:2181)

4.7.Simple Consume脚本

  kafka-simple-consumer-shell.sh工具主要是使用Simple Consumer API从指定Topic的分区读取数据并打印在终端:

bin/kafka-simple-consumer-shell.sh --broker-list www.iteblog.com:9092 --topic test --partition 0

4.8. 更新Zookeeper中的偏移量

  kafka.tools.UpdateOffsetsInZK工具可以更新Zookeeper中指定Topic所有分区的偏移量,可以指定成 earliest或者latest:

[iteblog@www.iteblog.com /\]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZKUSAGE: kafka.tools.UpdateOffsetsInZK$ \[earliest | latest\] consumer.properties topic

需要指定是更新成earliest或者latest,consumer.properties文件的路径以及topic的名称

转载于:https://my.oschina.net/suventop/blog/1859286

你可能感兴趣的文章
phpcms v9栏目列表调用每一篇文章内容方法
查看>>
python 自定义信号处理器
查看>>
luov之SMTP报错详解
查看>>
软件概要设计做什么,怎么做
查看>>
dwr
查看>>
java的特殊符号
查看>>
word2010中去掉红色波浪线的方法
查看>>
fabric上下文管理器(context mangers)
查看>>
JQuery-EasyUI Datagrid数据行鼠标悬停/离开事件(onMouseOver/onMouseOut)
查看>>
并发和并行的区别
查看>>
php小知识
查看>>
Windows下安装、运行Lua
查看>>
Nginx 反向代理、负载均衡、页面缓存、URL重写及读写分离详解(二)
查看>>
初识中间件之消息队列
查看>>
MyBatis学习总结(三)——优化MyBatis配置文件中的配置
查看>>
Spring常用注解
查看>>
我的友情链接
查看>>
PCS子层有什么用?
查看>>
查看端口,关闭端口
查看>>
代码托管平台简介
查看>>