博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka常用运维命令
阅读量:6224 次
发布时间:2019-06-21

本文共 5858 字,大约阅读时间需要 19 分钟。

列出所有topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --list
说明:其实就是去检查zk上节点的/brokers/topics子节点,打印出来

创建topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3
说明:线上环境我们会将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),parttitions和replication-factor是两个必备选项,第一个参数是消费并行度的一个重要参数,第二个极大提高了topic的可用性.备份因子默认是1,相当于没有备份,注意其值不能大于broker个数,否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。--alter  --config  --deleteConfig

删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete
说明:在0.8.2.1之前的版本一般不建议之行删除操作,因为有各种各样的bug存在,目前的版本稳定些,同时我们需要将配置参数打开(delete.topic.enable=true),删除操作其实是通过更改一个zk节点,由另外的删除线程异步做的topicdeletionmanager。
增加partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
说明:只能增加,不能减少。如果原有分散策略是hash的方式,将会受影响。发送端(默认10分钟会刷新本地存储元信息)和消费端都无需重启即可生效。
增加broker:
需要将安装包拷贝到对应服务器上,修改broker.id,不能与现有系统中broker id冲突,然后创建好对应的日志目录和数据目录等。注意的是,现有partition会继续保持到其他broker上面,新创建topic才可能分配到该新机器上面。如果需要保持整个kafka集群比较均衡,需要手动对现有数据进行迁移,尽量迁移非leader的partition,利用partition reassignment tools
1.--generate
也可以对候选的进行适当调整

[hadoop@i-o1oh kafka_2.10-0.8.2.1]$ cat topics-to-move.json

{"version":1,

 "partitions":[{"topic":"production_process_flow_test"},

               {"topic":"production_process_flow_dev"}

}

 

[hadoop@i-o1ohkafka_2.10-0.8.2.1]$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Current partition replica assignment

 

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

Proposed partition reassignment configuration

 

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

2.--execute 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --execute

Current partition replica assignment

 

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

 

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]}]}

3.--verify

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --verify

Status of partition reassignment:

Reassignment of partition [production_process_flow_test,1] completed successfully

Reassignment of partition [production_process_flow_dev,3] completed successfully

Reassignment of partition [production_process_flow_test,3] completed successfully

Reassignment of partition [production_process_flow_dev,0] completed successfully

Reassignment of partition [production_process_flow_dev,1] completed successfully

Reassignment of partition [production_process_flow_test,0] completed successfully

Reassignment of partition [production_process_flow_test,2] completed successfully

Reassignment of partition [production_process_flow_dev,2] completed successfully

下线某台机器分两种情况:

1.数据完整:直接将数据目录拷贝到新机器上,同时保持kafka安装配置不变,启动起来就行
2.数据不完整:这种情况下比较麻烦,需要对所有topic进行describe,如果发现有replica在这台下线机器上,需要用partition reassignment tool进行迁移工作
增减replica:
只需要编辑 --reassignment-json-file,添加或者减少broker id即可。其他操作可以依赖partition reassignment tool 的--execute --verify.
消费消息
watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning
查询消费信息
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev
写入消息性能测试
bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test

转载于:https://www.cnblogs.com/heidsoft/p/7693040.html

你可能感兴趣的文章
MySQL数据库学习笔记(五)----MySQL字符串函数、日期时间函数
查看>>
NPOI 导出excel带图片,可控大小
查看>>
算法数据结构(一)-B树
查看>>
阿里云官方教程 Linux 系统挂载数据盘
查看>>
(数组)众数问题
查看>>
如何写一个简单的手写识别算法?
查看>>
JavaScript学习笔记——函数
查看>>
atitit.基于 Commons CLI 的命令行原理与 开发
查看>>
Blog CSS
查看>>
git workflow 原文 以及缺点
查看>>
QT对话框中show和exec的区别
查看>>
Android和C#实时视频传输Demo
查看>>
java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)...
查看>>
Mysql Binlog三种格式介绍及分析
查看>>
70、二维码生成+圆形头像
查看>>
Pazera Free Audio Extractor 中文版 - 轻松将视频背景音乐/对话音频提取出来的免费软件...
查看>>
读取spring配置文件的方法(spring读取资源文件)
查看>>
PostConstruct
查看>>
MyEclipse------快速读取特定目录下的文件的内容(字节输入流)
查看>>
Linq查询操作之排序操作
查看>>