安装kafka
- 格式:docx
- 大小:35.92 KB
- 文档页数:4
Kafka安装及配置过程⼀.安装2.Kafka与scala版本也有关系。
3.kafka安装之前需要提前配置好hadoop集群与zookeeper集群。
kafka是依赖于zookeeper集群的。
hadoop集群:master,slave1,slave24.解压:tar -zxvf kafka_2.12-2.5.1.tgz -C /usr/hadoop⼆.修改配置⽂件进⼊解压⽂件 /usr/hadoop/kafka_2.12-2.5.1/config 这个⽬录下1.修改 server.properties#broker的全局唯⼀编号,不能重复broker.id=0#⽤来监听链接的端⼝,producer或consumer将在此端⼝建⽴连接port=9092#处理⽹络请求的线程数量work.threads=3#⽤来处理磁盘IO的线程数量num.io.threads=8#发送套接字的缓冲区⼤⼩socket.send.buffer.bytes=102400#接受套接字的缓冲区⼤⼩socket.receive.buffer.bytes=102400#请求套接字的缓冲区⼤⼩socket.request.max.bytes=104857600#kafka消息存放的路径log.dirs=/usr/hadoop/kafka_2.12-2.5.1/data#topic在当前broker上的分⽚个数num.partitions=2#⽤来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment⽂件保留的最长时间,超时将被删除log.retention.hours=168#滚动⽣成新的segment⽂件的最⼤时间log.roll.hours=168#⽇志⽂件中每个segment的⼤⼩,默认为1Glog.segment.bytes=1073741824#周期性检查⽂件⼤⼩的时间log.retention.check.interval.ms=300000#⽇志清理是否打开log.cleaner.enable=true#broker需要使⽤zookeeper保存meta数据zookeeper.connect=master:2181,slave1:2181,slave2:2181#zookeeper链接超时时间zookeeper.connection.timeout.ms=6000#partion buffer中,消息的条数达到阈值,将触发flush到磁盘log.flush.interval.messages=10000#消息buffer的时间,达到阈值,将触发flush到磁盘log.flush.interval.ms=3000#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除delete.topic.enable=true#此处的为本机IP(重要),如果不改,则客户端会抛出:Producerconnection to localhost:9092 unsuccessful 错误!=master注:broker.id=0 这个配置在每个配置节点上要修改不同的值。
kafka可视化⼯具安装及简单使⽤⼀、安装双击kafkatool_64bit.exe安装kafka可视化⼯具,并且C:\Windows\System32\drivers\etc配置HOSTS,打开HOSTS并添加:HOSTS添加:HOSTS添加的域名,是kafka安装服务器的IP ⽤户名,必要时登录linux服务器检查⼀下⼆、简单使⽤1、在桌⾯找到Kafka Tool 2.0快捷图标,双击进⼊可视化⼯具2、左上⾓File-Add New Connection...,添加链接(1)File-Add New Connection...-Add Cluster,弹框配置以下内容General配置:Cluster name=192.168.102.124:2181Kafka Cluster Version=0.11Zookeeper Host=192.168.102.124Zookeeper Port=2181Advanced配置:Bootstrap servers=192.168.102.124:9092(2)其他不⽤填写,【Test】成功,点【是】,即添加到可视化⼯具3、推送数据前修改推送数据格式配置,修改后进⼀步推送数据(1)推送数据前修改推送数据格式配置:Cluster name连接下双击Topics,展开topic,找到⽬标topic。
由于kafka tool默认是⽤⼗六进制传⼊,点击topic,选择Properties,Content Types下的Key和Message选择String,点击【update】-【refresh】,才能使⽤正常的string格式。
(2)展开topic下的Partions,点击Partion0,选择Data,选择【+】,(3)推送数据:【+】,弹出Add Message框,Key下value选择Enter Manually[Text],Message下value选择Enter Manually[Text],填写完毕后【Add】【Add】完后会弹框提⽰是否继续添加下⼀条数据,若要继续则【是】,若不需要则【否】关闭弹框4、除了添加数据,还可以查看历史推送数据,topic下-Partitions-Partition 0,点击按钮则在列表显⽰历史推送数据点击,列表下⽅选择TAB Message,可以看到原json本⽂内容均⾃⼰编写,若觉得不够细节可以进⼀步参考:。
Apache Kafka一、概念描述1、1 消息队列1.消息队列已经成为企业系统内部的核心通信手段,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
2.它是类似于数据库一样需要独立部署在服务器上的一种应用,提供接口给其他系统调用。
3.消息中间件是遵守JMS(java message service)规范的一种软件(大多数消息中间件遵守JMS规范)。
要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。
现在既有开源的提供者也有专有的提供者。
开源的提供者包括:Apache Active MQ、Kafka、Web Methods、阿里的Rocket MQ等。
1、2 什么是Kafka1.Apache Kafka是消息中间件的一种。
2.Kafka是一个分布式的、可分区的、可复制的消息系统。
1、3名词解析1.producer:生产者,生产“消息”。
2.consumer:消费者,消费“消息”。
3.topic:你把它理解为标签,生产者每生产出来一个“消息”就贴上一个标签(topic),消费者可不是谁生产的“消息”都消费,这样不同的生产者生产出来的“消息”,消费者就可以选择性的“消费”了。
4.broker:就是消息存储器。
二、Kafka安装1、Kafka 下载2、上传centos8 ,并解压1.[root@localhost home]# ll2.总用量 1107443.drwxr-xr-x. 2 root root 103 10月 26 21:56 app4.drwxr-xr-x. 3 root root 24 10月 25 00:10 dockerApache5.drwxr-xr-x. 2 root root 6 12月 4 2022 jdk6.-rw-r--r--. 1 root root 113400977 11月 4 20:51 kafka_2.12-3.6.0.tgz7.[root@localhost home]# clear8.[root@localhost home]# ll9.总用量 11074410.drwxr-xr-x. 2 root root 103 10月 26 21:56 app11.drwxr-xr-x. 3 root root 24 10月 25 00:10 dockerApache12.drwxr-xr-x. 2 root root 6 12月 4 2022 jdk13.-rw-r--r--. 1 root root 113400977 11月 4 20:51 kafka_2.12-3.6.0.tgz14.[root@localhost home]# tar -zxvf kafka_2.12-3.6.0.tgz15.kafka_2.12-3.6.0/3、启动zookeeper1.-rw-r--r--. 1 root root 28184 9月 29 00:56 NOTICE2.drwxr-xr-x. 2 root root 44 9月 29 01:00 site-docs3.[root@localhost kafka_2.12-3.6.0]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &4.[1] 18665.[root@localhost kafka_2.12-3.6.0]#4、启动Kafka1.[root@localhost kafka_2.12-3.6.0]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &2.[1] 18663.[root@localhost kafka_2.12-3.6.0]# ./bin/kafka-server-start.sh -daemon config/server.properties &4.[2] 22625.[1] 已完成 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties6.[root@localhost kafka_2.12-3.6.0]#三、Kafka 使用测试5、创建一个topics1.[root@localhost kafka_2.12-3.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create--topic test1 --partitions 2 --replication-factor 12.Created topic test1.3.[root@localhost kafka_2.12-3.6.0]#6、查看当前话题1.[root@localhost kafka_2.12-3.6.0]# ./bin/kafka-topics.sh --list --bootstrap-server localhost:90922.test13.[root@localhost kafka_2.12-3.6.0]#7、生产消息Ctr+c结束1.[root@localhost kafka_2.12-3.6.0]# ./bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:90922.>my name is even8、消费消息1.[root@localhost kafka_2.12-3.6.0]# ./bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:90922.my name is even9、关闭Kafka1.[root@localhost bin]# ./kafka-server-stop.sh2.[root@localhost bin]# ./zookeeper-server-stop.sh四、Java连接测试4、1 创建生成者1、创建一个Spring Boot项目Spring Boot 整合Kafka1.<dependency>2. <groupId>org.springframework.kafka</groupId>3. <artifactId>spring-kafka</artifactId>4.</dependency>2、在配置文件中加入配置1.server.port=99922. ###########【Kafka集群】###########3.spring.kafka.bootstrap-servers=localhost:90924. #==================================【初始化生产者配置】==================================#5. # 重试次数6.spring.kafka.producer.retries=07. # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)8.spring.kafka.producer.acks=19. # 批量大小10.spring.kafka.producer.batch-size=1638411. # 提交延时12.spring.kafka.producer.properties.linger.ms=013. # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka14. # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了15. # 生产端缓冲区大小16.spring.kafka.producer.buffer-memory = 3355443217. # Kafka提供的序列化和反序列化类18.spring.kafka.producer.key-serializer=mon.serialization.StringSerializer19.spring.kafka.producer.value-serializer=mon.serialization.StringSerializer20.# 自定义分区器21.#spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner3、编写测试代码1.@Component2.public class KafkaProducer {3. @Autowired4.private KafkaTemplate<String,String> kafkaTemplate;5./**6. * 发送消息7. */8.public void sendMessage() {9.try{10.//生产消息11. String message = "ruoYi !测试ruoYi ";12. ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("ruoYi", message);13. listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {14. @Override15.public void onSuccess(SendResult<String, String> result) {16. System.out.println("sendMessage success");17. }18. @Override19.public void onFailure(Throwable ex) {20. System.out.println("sendMessage error");21. }22. });23. }catch (Exception e){24. System.out.println("sendMessage exception");25. }26.}27.}4、发起测试1.@Autowired2.private KafkaProducer kafkaProducer;3.@RequestMapping("/hello")4.public String hello(){5. System.out.println("------->测试生产者发送消息");6. kafkaProducer.sendMessage();7.return"kafka消息已发送.";8.}5、查看Kafka 是否发送成功(若无法发送成功,可以尝试关闭防火墙)6、常见备注kafka报错:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.原因:没有在kafka的config/server.properties文件中配置listeners=PLAINTEXT:IP地址:9092如果不配置具体IP地址,则默认使用localhost,不在kafka所在的机器上连接时就会报上述错误,因为kafka在zookeeper中注册时使用的localhost。
实训6.3 Kafka集群部署L实训I目的通过本实训,了解Kafka的基本概念,理解Kafka工作原理,安装部署Kafka 集群。
2 .实训内容本实训通过安装和配置Kafka,学会执行Kafka,并且校验集群是否搭建成功。
3 .实训要求以小组为单元进行实训,每小组5人,小组自协商选一位组长,由组长安排和分配实训任务,具体参考实训操作环节。
小组成员需确保ZoOKeePer集群是否安装准确。
4 .准备知识(1) Kafka概念趣解①PrOdUCer:生产者,就是它来生产“鸡蛋”的。
②COnSUmer:消费者,生出的“鸡蛋”它来消费。
③TOPic:把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(Topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃” 了。
©Broker:就是篮子了。
©Partition: Partition是物理上的概念,每个Topic包含一个或多个Partition o©Consumer Group :每个Consumer 属于一个特定的Consumer Group (可为每个Consumer指定group name,若不指定group name,则属于默认的group) 如果从技术角度,TOPiC标签实际就是队列,生产者把所有“鸡蛋(消息)” 都放到对应的队列里了,消费者到指定的队列里取。
5 .实训步骤(1)安装和配置Kafka①将kafka_2.13-3.3.1 .tgz压缩包上传至master节点的/root/PaCkage目录下。
②解压kafka_2.13-3.3.Ltgz,这里解压在/root/PaCkage 目录下:Cd ∕root∕packagetar -zxvf kafka_2.13-3.3. Ltgz -C ∕opt∕software∕(2)配置KafkaZooKeeper zookeeper.connect=localhost:2181 localhost,修改为安装ZoOKeePer 的三台节点,即master、slavel> slave2,主机刍里6-31ZooKeeper图6-32修改ZoOKeePer日志路径(3)复制master 的Kafka 到SIaVe 1、slave2(4)slavel> slave2Kafka①配置SlaVel的配置文件(修改broker.id)slave2broker.id)新打开一个终端,登录SlaVe2并执行vim ∕opt∕software∕kafka2.13-3.3.1 ∕config∕server.propertiesbroker.id=2⑤拷贝master节点的环境变量到SlaVel和slave2SlaVel SlaVe2节点中执行以下命令,使配置生效:source ZetcZprofile(5)校验Kafka①启动Kafka需确保master、slavel> SlaVe2的ZOOKeePer已启动,如未启动则用下面指令启动(执行jps,有QUorUmPeerMain进程则表示已启动):Kafka6-33图6-33查看各节点上的进程6.实训总结本次实训注意要安装配置好ZooKeeper, Kafka的安装部署与ZooKeeper的安装部署大同小异,启动的时候记得要汆启动ZOoKeeper,对于Kafka的原理要认真理解。
Kafka安装配置及使用说明(铁树2018-08-08)(Windows平台,5个分布式节点,修改消息大小,调用程序范例)1安装配置采用5台服务器作为集群节点,IP地址为:XX.XX.0.12-XX.XX.0.16.每台机器依次安装配置JDK zookeeper、kafka,先安装完一台机器,然后拷贝到其他机器,再修改配置文件。
1.1 JDK安装配置JDK版本:jdk1.7.0_51_x64 解压版(jdk1.7.0_51_x64.rar )解压到C盘kafka目录下,如图所示首特KtBAHXJ'.bin2010.1/30 王币K =S h db2013/7/3V 七比5:件耒.incbde2018/7/30 £36上沁201^7/® 2^36h瞬了20147/30 2\36.[ih201^7/30 J5 3-B11 CQPvwGHrr2013/1^15 2CTO1文件 4K@LKtN$E Wli/4/10 *141KB 丄和d] README201V4/10M4HTML 応1KB理|即■艸2QW4/10 中141呵.ire2013/12/1B 7002iMnRSf? J1P 尹也珈u计刖...TM [RDP4HraECEN^r R EADME2014/4^10 tU17] KB & =堆址H {3□ THLRO^ftRIVUCENSEREADWt-JAVA-FX2014/4/10 RL4123K£lID 宓亍2S .tic设置环境变量:JAVA_HOMEC:\kafka\jdk1.7.0_51_x64PATH C:\kafka\jdk1.7.0_51_x64\bin麥毘名型):jm_HONE妄重值也): C:莎毎i\j dkl. 7.0_51_1«64|1.2 zookeeper 安装配置1.2.1解压安装zookeeper 版本:3412 (zookeeper-3412.tar.gz )詞匸F 2仝召耳丰” U卡话諮文弍矣解压到C盘kafka目录下,如图所示计垃巧■衣抽剧&隹;]* ki祖亓* 5-4.121.2.2创建zookeeper数据目录和日志目录zkdata #存放快照C:\kafka\zookeeper-3412\zkdatazkdatalog#存放日志C:\kafka\zookeeper-3412\zkdatalog1.2.3修改配置文件进入到“C:\kafka\zookeeper-3412 ”目录下的conf目录中,复制zoo_sample.cfg (官方提供的zookeeper的样板文件),重命名为zoo.cfg (官方指定的文件命名规则)« 本為iE 誣心)►ksika Ik ipolcwpe—3 <112 conf■- it itciad默认内容:1 t Th 电 r.umber af niLLisecnnds of 色亀亡h 1 ick2 iLclriijie^SOljJ t Ihe ruFibor of t £cks that tho iruit ial 4 * jyncbut cnizat i on pharc ca- tak?4 Ihe hunbBi : of ticts that can pajs belvten7 $ sendw a request 吐id. eettinz © icknovl^dgement 耳 syricLlJiitsBq i 十h* di reetery whfrp +t.n im 现i 雪hat is licr^ri ・t do net 工令 /tup for stor^c^ /tup htre is jurt 11 t spikes ida"t i :3Qkc?pcr# the port 可t which -the clients 呼ill coimcrtU cLientFort^2131f the jisKimum minbeT of client co?mecT laris.£ zncresse this i_£ you r.eEd to hand L e no re clxent^ 17tnsjcCl ier±Cra:n3= 5 □IS # 車 fie sure to rsal the nLiirtenan.es scctzon cf tlie20 幸 atLnirjLEtratej 匚 ^ULde bef 匚工匕 tijrnj.n^ on aut'Opurge»2144 him :77盘口或&女口空匚.ZLDAC 上已・ ot 色FdtoiCL"i :iii 工亡!IT /®□亡X BE □色血二n ・htilflsc^niaiiiT enance24 t 7h# r.ujTibsr af snap shots ta- retain tn dli+dJirtau't&pujEge ・ snapR*t a.inCanmt-3# Pur co tack. intocTral in hour 口27 t Set tc *0* ts 11 fable aute p>jrgs f oMure# ant op ILL 宦口 pux < s Int erv 41= 129修改后配置文件为:if The nuff.her cf ticks that tho uii"t ial 巾 即 cyriehecnLEa'tici^ phsso aan tdc« LnL^LinitsLDH Thp iriur.hPT of tick? Th 献 EWI hpiv^pnH s^niiing a QH ^SI 釧d t inj 砒 a^nnwlertg^ngrit synGLiinit=5 #t he direertcry the snapshoot LS siored. 常 dn not USB /i up DE storagBj /tup here is jusl 0 ezsnpLe srakes.dal iDi r=C : /kaf k 3 o oke epei^ 3k . 12/' sk Jai adat aLui-EDAi-C^k afls^zookEepejD-O.. 4. 12 ■''zld -atalDE :* t hs port at vhich the clieEirts wllL eoni^ct cli«YtPort=l2]81. nrwr, l?10. 99. D. 12s 12838: 13S8$ s«rwr. 2x10. SS. D. 13:12SB8:138BSserver. XI0.9現 0.14H2888:13838 mF 4F 10. 99« DL 15; 128BB; 13338 骂”兀T 甲 6^10- 99w 0-16: 12SS3;]33S$ I0 The 丁Lssiirm nixTiter of clzerrt cor_neztLons B星 inciease this if you used to hardlm note cLients ftnirCl:. erACrutr.s-SO 律律 sure to resd ths mwJtDnan©白 section of tho 0sliriinifflr iter 事二日暂 befcr? turning cn aurt operg :#・ 9 hrm ; f/r i)応总吕ac 直吐凶..o E " dou/cun:程nt/医U okE BnmrAitiiiiL htnlg 呂 c ;_maint Eaaric 甘# The niff-.bc-r of snapshots 十 0 rFt»n ui dstaPix'au.1 3pnTge ・ sTiSf Est aLnCount - J □□ 0 P -irg :€i tasjE int=r^al IZI hours■ J 堆 Set 1 D *0* 1 □ disable aut D purge fea1 LLITEorpij r 牴》r ・ pur 百 e Irrt e 匚晴呂 1 二24367 691011坨|14 161 ie n IF is卑21 22 23 ■£■?乔齐# The nwiiber of ffiillisecoads (of each tick £ tjckTjjir-20D0# The number of milliseconds of each tick tickTime=2000# The nu mber of ticks that the in itial# synchroni zati on phase can takeini tLimit=10# The nu mber of ticks that can pass betwee n# sending a request and gett ing an ack no wledgeme nt syncLimit=5# the directory where the sn apshot is stored.# do not use /tmp for storage, /tmp here is just# example sakes.dataDir二C:/kafka/zookeeper-3.4.12/zkdatadataLogDir二C:/kafka/zookeeper-3.4.12/zkdatalog# the port at which the clie nts will connectclie ntPort=12181server. 1=XX.XX.0.12:12888:13888server.2=XX.XX.0.13:12888:13888server.3=XX.XX.0.14:12888:13888server.4=XX.XX.0.15:12888:13888server.5=XX.XX.0.16:12888:13888# the maximum nu mber of clie nt conn ecti ons.# in crease this if you n eed to han dle more clie nts#maxClie ntCnxn s=60## Be sure to read the maintenance secti on of the# adm ini strator guide before tur ning on autopurge.##/doc/curre nt/zookeeperAdmi n.html#sc_mai nte nance## The nu mber of sn apshots to reta in in dataDirautopurge.s napRetai nCoun t=100# Purge task in terval in hours# Set to "0" to disable auto purge featureautopurge.purgeI nterval=24配置文件解释:#tickTime :这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime 时间就会发送一个心跳。
Kafaka详细介绍机制原理1. kafka介绍1.1. 主要功能根据官⽹的介绍,ApacheKafka®是⼀个分布式流媒体平台,它主要有3种功能: 1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因 2:It lets you store streams of records in a fault-tolerant way.以容错的⽅式记录消息流,kafka以⽂件的⽅式来存储消息流 3:It lets you process streams of records as they occur.可以再消息发布的时候进⾏处理1.2. 使⽤场景1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系统或应⽤程序之间构建可靠的⽤于传输实时数据的管道,消息队列功能2:Building real-time streaming applications that transform or react to the streams of data。
构建实时的流数据处理程序来变换或处理数据流,数据处理功能1.3. 详细介绍Kafka⽬前主要作为⼀个分布式的发布订阅式的消息系统使⽤,下⾯简单介绍⼀下kafka的基本机制1.3.1 消息传输流程 Producer即⽣产者,向Kafka集群发送消息,在发送消息之前,会对消息进⾏分类,即Topic,上图展⽰了两个producer发送了分类为topic1的消息,另外⼀个发送了topic2的消息。
Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注⾃⼰需要的Topic中的消息 Consumer即消费者,消费者通过与kafka集群建⽴长连接的⽅式,不断地从集群中拉取消息,然后可以对这些消息进⾏处理。
kafka-go用法-回复[Kafkago用法] – 一步一步回答Kafka是一款高性能、分布式的消息队列系统,被广泛应用于大规模数据处理、事件驱动架构和实时流数据处理场景中。
它具备高吞吐量、持久性、可扩展性和容错性等特点,使得很多公司和组织选择使用Kafka来解决数据流的传输和处理问题。
在本文中,我将为您详细介绍Kafka的用法,并逐步解释其主要概念和操作步骤。
第一步:安装Kafka在开始使用Kafka之前,首先需要在您的机器上安装Kafka。
您可以从Kafka官方网站上下载适用于您操作系统的二进制文件,并按照官方文档提供的指引来进行安装。
一旦安装完成,您就可以继续进行下一步操作。
第二步:启动Kafka集群Kafka是一个分布式系统,通常由多个服务器组成一个集群。
在启动Kafka之前,您需要创建一个或多个服务器实例,并在它们的配置文件中指定Kafka的相关参数。
一旦配置完成,您可以依次启动每个服务器实例,组成一个Kafka集群。
第三步:创建Topic在Kafka中,消息按照Topic进行分类和存储。
一个Topic可以看作是一个消息的容器,每个消息都有一个唯一的标识符(offset)来表示在Topic中的位置。
要创建一个Topic,您可以使用Kafka提供的命令行工具或编程接口。
例如,可以使用命令`kafka-topics.sh`来创建一个新的Topic,并指定相关配置参数,如Topic的名称、分区数和副本数等。
第四步:生产者发送消息在Kafka中,消息生产者负责产生和发送消息。
一个生产者可以向一个或多个Topic发送消息,并可以按照指定的规则来分区和选择目标主题。
您可以使用Kafka提供的生产者API,或者调用相应的命令行工具进行消息的发送。
无论使用哪种方式,您都需要指定消息的内容和目标Topic。
第五步:消费者接收消息与生产者相对应的是消息消费者,它负责从一个或多个Topic中读取消息,并进行相应的处理。
kafka集群管理⼯具kafka-manager部署安装⼀、kafka-manager 简介为了简化开发者和服务⼯程师维护Kafka集群的⼯作,yahoo构建了⼀个叫做Kafka管理器的基于Web⼯具,叫做 Kafka Manager。
这个管理⼯具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。
它⽀持管理多个集群、选择副本、副本重新分配以及创建Topic。
同时,这个管理⼯具也是⼀个⾮常好的可以快速浏览这个集群的⼯具,有如下功能:1.管理多个kafka集群2.便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)3.选择你要运⾏的副本4.基于当前分区状况进⾏5.可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)6.删除topic(只⽀持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)7.Topic list会指明哪些topic被删除(在0.8.2以上版本适⽤)8.为已存在的topic增加分区9.为已存在的topic更新配置10.在多个topic上批量重分区11.在多个topic上批量重分区(可选partition broker位置)kafka-manager 项⽬地址:⼆、安装1. 环境要求1.安装jdk8jdk-1.8.0_602,kafka集群服务器:10.0.0.50:1218110.0.0.60:1218110.0.0.70:12181软件:kafka_2.8.0-0.8.1.1zookeeper-3.3.63.系统Linux kafka50 2.6.32-642.el6.x86_64 #1 SMP Tue May 1017:27:01 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux2. 下载安装 kafka-manager2.1 .下载kafka-manager想要查看和管理Kafka,完全使⽤命令并不⽅便,我们可以使⽤雅虎开源的Kafka-manager,GitHub地址如下:我们可以使⽤Git或者直接从Releases中下载,此处从下⾯的地址下载 1.3.3.7 版本:下载完成后解压。
kafka调用方式Kafka 是一个分布式流处理平台,通常用于构建实时数据流应用程序。
以下是使用Kafka 的一般步骤和调用方式:1. 安装Kafka:-首先,需要在你的系统上安装Kafka。
可以从Kafka 官网下载二进制版本,并按照官方文档中的指导安装。
2. 启动ZooKeeper:-Kafka 使用ZooKeeper 来协调和管理集群中的节点。
在启动Kafka 之前,确保ZooKeeper 已经在运行。
3. 启动Kafka Broker:-启动Kafka Broker,即Kafka 服务器。
这通常可以通过运行Kafka 的启动脚本实现。
4. 创建Topic:-在Kafka 中,消息被发布到主题(Topic)中。
在使用Kafka 前,你需要创建一个或多个主题。
5. 生产者(Producer):-生产者负责向Kafka 主题发送消息。
你可以使用Kafka 提供的客户端库,如Java、Python、或其他语言中的Kafka 生产者API。
```java// Java 生产者示例Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "mon.serialization.StringSerializer");props.put("value.serializer", "mon.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("my-topic", "key", "value"));producer.close();```6. 消费者(Consumer):-消费者从Kafka 主题中拉取消息。
kafka入门教程
Kafka是一个分布式流处理平台,被广泛应用于大数据处理和
实时数据流处理的场景。
它具有高吞吐量、可扩展性强、持久性和容错能力、以及多种数据处理模式等特点。
本教程将引导你进入Kafka的世界,教你如何安装、配置和使
用Kafka。
下面是教程内容概述:
1. 下载和安装Kafka
首先,你需要从Kafka的官方网站上下载最新版本的Kafka。
接下来,我们将介绍如何在不同操作系统上进行安装和配置。
2. 配置Kafka
在安装完成后,你需要对Kafka进行一些必要的配置。
这包
括设置Zookeeper连接、主题(Topic)的配置、分区(Partition)的配置等。
3. 创建生产者和消费者
一旦配置完成,你就可以创建Kafka的生产者和消费者实例了。
生产者可将消息发送到指定的主题,消费者则可以订阅特定的主题并消费其中的消息。
4. 发送和接收消息
通过创建生产者并将消息发送至主题,你可以实现向Kafka
集群发送消息的功能。
消费者可以通过订阅主题并接收消息来完成消息消费的功能。
5. Kafka的高级特性
在熟悉基本使用后,你还可以学习一些更高级的Kafka特性,如消息分区、消息持久化、消息的顺序性等。
6. 故障处理和调优
正确处理故障和进行性能优化是使用Kafka的关键。
我们将
介绍一些常见的故障处理和性能优化技巧。
通过本教程,你将能够迅速上手使用Kafka,并了解它的基本
概念和常用功能。
希望这对你入门Kafka有所帮助!。
Linux下Kafka下载与安装教程
原⽂链接:
⼀、预备环境
Kafka是java⽣态圈中的⼀员,运⾏在java虚拟机上,按Kafka官⽅说明,java环境推荐Java8;Kafka需要Zookeeper保存集群的元数据信息和消费者信息。
Kafka⼀般会⾃带Zookeeper,但是从稳定性考虑,应该使⽤单独的Zookeeper,⽽且构建Zookeeper集群。
jdk1.8下载安装教程:
zookeeper下载安装教程:
⼆、下载
官⽹下载:百度⽹盘:提取码:vow8
java开发⼯具下载地址及安装教程⼤全,点。
更多深度技术⽂章,在。
三、安装
1、将下载好的kafka安装⽂件上传⾄linux系统,可以使⽤ftp⼯具或者在窗⼝使⽤alt+p快捷键打开上传窗⼝。
拖拽kafka⽂件上传。
2、解压
tar -zxvf kafka_2.11-2.3.0.tgz
3、移动到/usr/local/,并改名kafka
mv kafka_2.11-2.3.0 /usr/local/kafka
4、启动zookeeper
进⼊zookeeper的bin⽬录执⾏
./zkServer.sh start
5、启动kafka
执⾏./bin/kafka-server-start.sh config/server.properties &
6、验证
jps命令,出现表⽰成功
原创⽂章,转载请注明出处。
java开发⼯具下载地址及安装教程⼤全,点。
更多深度技术⽂章,在。
cloudevents kafka java示例Cloudevents是一种用于在云原生环境中传输和处理事件的规范。
它提供了一种统一的事件模型和协议,使得在不同的云平台和开发框架之间进行事件传递变得简单和可扩展。
本文将以Cloudevents Kafka Java示例为主题,逐步介绍如何使用Cloudevents与Kafka 结合进行事件处理。
第一步:了解CloudeventsCloudevents是由CNCF(Cloud Native Computing Foundation)维护的开放标准,用于在云原生环境中传输和处理事件。
它定义了一套事件规范,包括事件格式和协议。
Cloudevents允许开发者使用不同的编程语言和框架创建和处理事件,而无需关心底层的实现细节。
第二步:安装Kafka在使用Cloudevents和Kafka之前,需要先安装和配置Kafka。
Kafka是一个分布式事件流平台,它允许高吞吐量的发布和订阅事件。
可以从Apache Kafka的官方网站下载和安装Kafka,并按照官方文档进行配置。
第三步:创建Kafka Producer在Java中,可以使用Kafka提供的KafkaProducer类来创建一个Kafka Producer实例,并将事件发送到Kafka集群。
下面是一个简单的示例代码:javaimport org.apache.kafka.clients.producer.*;import io.cloudevents.*;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");props.put("value.serializer","io.cloudevents.kafka.CloudEventSerializer");KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(props);CloudEvent event = CloudEventBuilder.v1().withId("1").withType("com.example.event").withSource(URI.create("/")).withData("text/plain", "Hello,Kafka!".getBytes()).build();ProducerRecord<String, CloudEvent> record = new ProducerRecord<>("my-topic", event);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();}}});producer.close();}}在上面的示例中,我们创建了一个KafkaProducer实例,并配置了Kafka的地址和序列化器。
kafka3 的python使用方法【原创版3篇】《kafka3 的python使用方法》篇1Kafka 3 是Kafka 的一个新版本,它带来了许多性能和功能上的改进。
在Python 中使用Kafka 3,可以使用`kafka-python-client` 库。
首先需要安装这个库,可以通过以下命令进行安装:```pip install kafka-python-client```安装完成后,可以使用以下代码示例来创建一个Kafka 3 的生产者和消费者:```pythonfrom kafka import KafkaProducerfrom kafka import KafkaConsumerimport json# 创建Kafka 生产者producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode("utf-8"))# 要发送的消息messages = [{"key": "value"},{"key": "value2"},]# 发送消息到名为"test-topic" 的Kafka 主题producer.send("test-topic", messages)# 确保所有消息都已发送并确认producer.flush()# 关闭生产者producer.close()# 创建Kafka 消费者consumer = KafkaConsumer(bootstrap_servers="localhost:9092", group_id="test-group", value_serializer=lambda v:json.loads(v.decode("utf-8")))# 订阅名为"test-topic" 的Kafka 主题consumer.subscribe("test-topic")# 获取消费者中的消息for message in consumer:print(message.value)# 确保消费者已成功处理所有消息consumer.close()```这个示例首先创建一个Kafka 生产者,发送一些消息到名为"test-topic" 的主题。
librdkafka编译libdrkafka:数据流机制的关键组件编译librdkafkalibrdkafka是一款功能强大的Apache Kafka客户端库,可以帮助开发者快速地开发基于Kafka的应用程序,其可以支持C、C++、Python、Go、.NET和Erlang等多种语言开发。
在本文中,我们将简单介绍librdkafka编译,供大家学习参考。
一、安装编译器要编译librdkafka,首先要安装编译器,在Linux上可以使用gcc或者clang;在Windows上可以使用MSVC;在macOS上可以使用Xcode。
二、安装cmake在编译librdkafka之前,还需要安装cmake,它是一款开源的编译管理软件,可以把源代码转换成程序可以理解的格式。
在Windows上可以到官网下载msi安装包进行安装;在Linux和macOS上可以使用下面的命令行来安装:```bashsudo apt install cmake # Debian Linuxsudo yum install cmake # RedHat/CentOS Linuxbrew install cmake # macOS三、编译librdkafka(1)安装依赖在编译librdkafka前,还需要安装一些依赖库。
在Linux上,可以执行下面的命令来安装依赖:sudo apt install libssl-dev zlib1g-dev # Debian Linuxsudo yum install openssl-devel zlib-devel # RedHat/CentOS Linuxbrew install openssl zlib # macOS(2)下载librdkafka编译之前,要先从Github上下载librdkafka源码。
git clone /edenhill/librdkafka.git然后进入librdkafka目录,执行下面的命令来进行编译:mkdir build && cd buildcmake ..make(3)安装librdkafka完成编译后,可以使用下面的命令行安装librdkafka:sudo make install四、总结以上,就是关于librdkafka编译的介绍,通过编译安装librdkafka可以帮助开发者更轻松地编写Kafka应用程序,进而更加高效地支持大规模的分布式Kafka系统。
Linux下安装kafka本⽂主要介绍如何在Centos7下安装kafka。
本⽂使⽤的kafka版本是:2.8.0安装JDK下载安装zookeeper下载安装kafka设置开机⾃动启动⼀、安装JDK在安装kafka之前必须先安装JDK和zookeeper,如何安装JDK,可以查看:⼆、下载安装zookeeper如何在linux下安装zookeeper,这⾥不再赘述,上⼀篇博客已经详细介绍过,需要的朋友可以查看:三、下载安装kafka3.1 通过官⽹直接下载:// 进⼊需要下载的⽬录cd /data// 下载kafkawget https:///apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz3.2 解压并进⼊kafka⽬录tar -xzf kafka_2.12-2.8.0.tgzcd kafka_2.12-2.8.03.3 启动kafka3.3.1 启动kafka之前要确保zookeeper已经启动,如果没有启动,执⾏以下命令:zkServer.sh3.3.2 启动kafka之前,需要修改kafka配置⽂件中的zookeeper地址,打开配置⽂件:vi config/server.properties修改zookeeper.connect=192.168.1.202:2081(这⾥修改成⾃⼰安装的zookeeper地址和端⼝即可),除了修改zookeeper服务地址外,还有其他配置项,如⽇志储存路径、消息的最⼤持久化时间、端⼝等等,这⾥不再赘述,有兴趣的朋友可以查看我以前的⼀篇博客⾥⾯已经详细说明过。
3.3.3 启动kafkabin/kafka-server-start.sh config/server.properties四、设置开机⾃动启动4.1 切换到/lib/systemd/system/⽬录,创建⾃启动⽂件cd /lib/systemd/system/vi kafka.service⽂件内容如下:[Unit]Description=kafkaserviceAfter=network.target[Service]WorkingDirectory=/data/kafka_2.12-2.8.0ExecStart=/data/kafka_2.12-2.8.0/bin/kafka-server-start.sh config/server.propertiesExecStop=/data/kafka_2.12-2.8.0/bin/kafka-server-stop.shUser=rootGroup=rootRestart=alwaysRestartSec=10[Install]WantedBy=multi-user.target4.2 设置⾃启动systemctl enable kafka.service4.3 ⽴即启动服务systemctl start kafka.service4.4 查看启动状态systemctl status kafka.service⾄此,linux安装kafka就完成了!。
flink cdc消费kafka用法FLINK CDC消费Kafka用法导读:Flink是一个流式处理引擎,而Kafka是一个分布式流处理平台。
Flink CDC (Change Data Capture)是一种用于从数据库中捕获和传输变更数据的技术。
结合Flink和CDC,可以实现从数据库到Kafka的数据流同步和实时处理。
本文将详细介绍如何使用Flink CDC消费Kafka,包括配置环境、使用Flink CDC 源和Sink,以及处理Kafka消息等。
第一步:配置环境在开始之前,需要确保已经正确配置了Flink和Kafka的环境。
1. 下载和安装Flink:可以从Flink的官方网站下载Flink的二进制包,并按照官方文档的说明进行安装。
2. 下载和安装Kafka:可以从Kafka的官方网站下载Kafka的二进制包,并按照官方文档的说明进行安装。
3. 配置Flink和Kafka的连接:在Flink的配置文件flink-conf.yaml中添加以下配置:yaml# Kafka相关配置bootstrap.servers: localhost:9092# ZooKeeper相关配置zookeeper.connect: localhost:2181这样就完成了Flink和Kafka的配置。
接下来,我们需要创建一个Kafka主题,用于数据的传输。
第二步:使用Flink CDC源Flink提供了一个专门用于从CDC源中读取变更数据的源函数(Source Function)。
我们可以使用该源函数从数据库中捕获变更数据,并将其发送到Kafka中。
1. 添加Flink CDC的依赖:在项目的pom.xml文件中添加以下依赖:xml<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-cdc</artifactId><version>{flink.version}</version></dependency>这样就完成了Flink CDC的依赖配置。
bin/kafka-topics.sh --describe --zookeeper hadoop1:2181 --topic test--topic testTopic:test PartitionCount:3 ReplicationFactor:3 Configs:Topic: test Partition: 0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112Topic: test Partition: 1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110Topic: test Partition: 2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111说明:{partiton:分区:IDleader:当前负责读写的lead broker idkrelicas:当前partition的所有replication broker listisr:relicas的子集,只包含出于活动状态的broker}去zk上看kafka集群[zk: localhost:2181(CONNECTED) 5] ls /[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 6] ls /brokers ----> 查看注册在zk内的kafka[topics, ids][zk: localhost:2181(CONNECTED) 7] ls /brokers/ids[112, 110, 111][zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112[][zk: localhost:2181(CONNECTED) 9] ls /brokers/topics[test][zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test[partitions][zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions[2, 1, 0][zk: localhost:2181(CONNECTED) 12]8、关闭kafkapkill -9 -f server.properti二、kafka java调用:1、 java端生产数据, kafka集群消费数据:Java代码1. 1创建maven工程,pom.xml中增加如下:2. <dependency>3. <groupId>org.apache.kafka</groupId>4. <artifactId>kafka_2.10</artifactId>5. <version>0.8.2.0</version>6. </dependency>7.8.9. 2 java代码:向主题test内写入数据10.11. import java.util.Properties;12. import java.util.concurrent.TimeUnit;13.14. import kafka.javaapi.producer.Producer;15. import kafka.producer.KeyedMessage;16. import kafka.producer.ProducerConfig;17. import kafka.serializer.StringEncoder;18.19.20.21.22. public class kafkaProducer extends Thread{23.24. private String topic;25.26. public kafkaProducer(String topic){27. super();28. this.topic = topic;29. }30.31.32. @Override33. public void run() {34. Producer producer = createProducer();35. int i=0;36. while(true){37. producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));38. try {39. TimeUnit.SECONDS.sleep(1);40. } catch (InterruptedException e) {41. e.printStackTrace();42. }43. }44. }45.46. private Producer createProducer() {47. Properties properties = new Properties();48. properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk49. properties.put("serializer.class", "kafka.serializer.StringEncoder");50. properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明kafka broker51. return new Producer<Integer, String>(new ProducerConfig(properties));52. }53.54.55. public static void main(String[] args) {56. new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test57.58. }59.60. }61.62.63.64.65. 3 kafka集群中消费主题test的数据:66. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin67.68. 4启动java代码,然后在看集群消费的数据如下:69.70. message: 071. message: 172. message: 273. message: 374. message: 475. message: 576. message: 677. message: 778. message: 879. message: 980. message: 1081. message: 1182. message: 1283. message: 1384. message: 1485. message: 1586. message: 1687. message: 1788. message: 1889. message: 1990. message: 2091. message: 212、kafka 使用Java写消费者,这样先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:Java代码1. import java.util.HashMap;2. import java.util.List;3. import java.util.Map;4. import java.util.Properties;5.6. import kafka.consumer.Consumer;7. import kafka.consumer.ConsumerConfig;8. import kafka.consumer.ConsumerIterator;9. import kafka.consumer.KafkaStream;10. import kafka.javaapi.consumer.ConsumerConnector;11.12.13.14.15. /**16. * 接收数据17. * 接收到: message: 1018. 接收到: message: 1119. 接收到: message: 1220. 接收到: message: 1321. 接收到: message: 1422. * @author zm23. *24. */25. public class kafkaConsumer extends Thread{26.27. private String topic;28.29. public kafkaConsumer(String topic){30. super();31. this.topic = topic;32. }33.34.35. @Override36. public void run() {37. ConsumerConnector consumer = createConsumer();38. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();39. topicCountMap.put(topic, 1); // 一次从主题中获取一个数据40. Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =consumer.createMessageStreams(topicCountMap);41. KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据42. ConsumerIterator<byte[], byte[]> iterator = stream.iterator();43. while(iterator.hasNext()){44. String message = new String(iterator.next().message());45. System.out.println("接收到: " + message);46. }47. }48.49. private ConsumerConnector createConsumer() {50. Properties properties = new Properties();51. properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk52. properties.put("group.id", "group1");// 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据53. return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));54. }55.56.57. public static void main(String[] args) {58. new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test59.60. }61.62. }。
kafka在windows下的安装和配置博主最近在学习有关kafka的配置安装以及在spring的集成使⽤。
但⽹上关于kafka的配置参考资料基本都是于linux下的配置,于是博主在整理了相关windows下kafka的配置记录在博客⾥。
由于是简单配置所以在这⾥只建了⼀个topic以及⼀个producer和两个consumer。
在官⽹上下载 zookeeper和kafka(我下的版本kafka_2.11-0.11.0.0,这个版本中bin⽬录下有windows⽬录),注意不要下载源码包(名字中带有src),否则启动的时候会报错。
1、配置好jdk环境2、解压zookeeper到指定⽬录,找到解压后⽬录中conf⽂件夹中zoo_sample - 副本.cfg⽂件,复制在conf中改名为zoo.cfg。
在bin⽂件夹中打开zkServer.bat启动zookeeper。
⾄此,zookeeper启动完成。
3、解压kafka到指定⽬录。
查看kafka根⽬录中config⽂件夹下server.properties,确认其中关于zookeeper的连接端⼝和zookeep中zoo.cfg的端⼝⼀致。
3.1、启动kafka 在cmd中进⼊kafka根⽬录。
输⼊以下命令: .\bin\windows\kafka-server-start.bat .\config\server.properties kafka启动成功 3.2、创建topic 在cmd中进⼊kafka\bin\windows⽬录,输⼊以下命令: kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 创建成功 3.3、创建kafka producer 在cmd中进⼊kafka\bin\windows⽬录,输⼊以下命令: kafka-console-producer.bat --broker-list localhost:9092 --topic test 注意这⾥的端⼝和server.properties的端⼝号⼀致。
kafka安装及配置过程⼀、安装kafka根据Scala版本不同,⼜分为多个版本,我不需要使⽤Scala,所以就下载官⽅推荐版本kafka_2.12-2.4.0.tgz。
使⽤tar -xzvf kafka_2.12-2.4.0.tgz 解压为了使⽤⽅便,可以创建软链接kafka0⼆、Zookeeper配置当前下载的kafka程序⾥⾃带Zookeeper,可以直接使⽤其⾃带的Zookeeper建⽴集群,也可以单独使⽤Zookeeper安装⽂件建⽴集群。
1. 单独使⽤Zookeeper安装⽂件建⽴集群Zookeeper的安装及配置可以参考另⼀篇博客,⾥⾯有详细介绍2. 直接使⽤其⾃带的Zookeeper建⽴集群kafka⾃带的Zookeeper程序脚本与配置⽂件名与原⽣Zookeeper稍有不同。
kafka⾃带的Zookeeper程序使⽤bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh来启动和停⽌Zookeeper。
⽽Zookeeper的配制⽂件是config/zookeeper.properties,可以修改其中的参数(1)启动Zookeeperbin/zookeeper-server-start.sh -daemon config/zookeeper.properties加-daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执⾏⽬录的logs/zookeeper.out⽂件中。
对于⼩内存的服务器,启动时有可能会出现如下错误os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)可以通过修改bin/zookeeper-server-start.sh中的参数,来减少内存的使⽤,将下图中的-Xmx512M -Xms512M改⼩。
1.1.1.安装kafka
获取kafka安装包kafka_2.11-0.8.2.0.tgz
1、将软件包上传到/app/目录下,以下蓝色字体为命令:
cd /app/
tar -zxvf kafka_2.11-0.8.2.0.tgz
mv kafka_2.11-0.8.2.0 kafka
2、修改配置文件 conf/server.properties,红色部分为必须修改
的配置
修改配置文件 bin/kafka-run-class.sh
3、启动kafka
cd bin
nohup./kafka-server-start.sh ../config/server.proper
ties&
4、测试
1)查看进程
jps
37400 Kafka
含义kafka进程说明成功。
5、使用scp命令复制至其他服务器,并修改必要的配置文件
scp –r /app/kafka host:/app/
1.1.
2.安装kafka-manager
1、获取安装包kafka-manager-1.0-SNAPSHOT.zip,上传至/app目录下,执行:
cd /app
unzip kafka-manager-1.0-SNAPSHOT.zip
mv kafka-manager-1.0-SNAPSHOT kafka-manager
2、修改配置文件conf/application.conf
找到如下,修改为一个可用的zookeeper地址,该地址为kafka-manager
存储数据的地方,不必须是kafka注册的zookeeper地址。
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
3、启动
cd bin
./kafka-manager -Dconfig.file=../conf/application.conf& 访问网址,必须用IE,360不兼容
http://localhost:9000/
按如下图示添加集群。