Kafka 的东西和编程接口
Kafka 的东西
Kafka 提供的东西照旧较量全的,昆山软件开发,bin/
目次下的东西有以下一些,
bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh
我常用的呼吁有以下几个,
bin/kafka-server-start.sh -daemon config/server.properties & bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181 bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1 bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1
kafka-server-start.sh
是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties
,该文件中的设置项待会再说.尚有一个 -daemon
参数,这个是将 Kafka 放在靠山用守护历程的方法运行,假如不加这个参数,Kafka 会在运行一段时间后自动退出,听说这个是 0.10.0.0 版本才有的问题 5。kafka-topics.sh
是用于打点 Topic 的东西,我主要用的 --describe
、--list
、--delete
、--create
这4个成果,上述的例子根基是不言自明的,--replication-factor 3
、--partitions 2
这两个参数别离暗示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh
和 kafka-console-producer.sh
是出产者和消费者的浅易终端东西,在调试的时候较量有用,我常用的是 kafka-console-consumer.sh
。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的宣布版本 3.4.8,端口是默认2181,与 Broker 在同一台呆板上。
下面说一下 Broker 启动的设置文件 config/server.properties
,我在默认设置的基本上,修改了以下一些,
broker.id=0 listeners=PLAINTEXT://192.168.232.23:9092 log.dirs=/tmp/kafka-logs delete.topic.enable=true
broker.id
是 Kafka 集群中的 Broker ID,不行反复,我在多副本的尝试中,将他们别离配置为0、1、2;listeners
是 Broker 监听的地点,默认是监听 localhost:9092
,因为我不是单机尝试,所以修改为本机局域网地点,虽然,假如要监听所有地点的话,也可以配置为 0.0.0.0:9092
,多副本尝试中,将监听端口别离配置为 9092、9093、9094;log.dirs
是 Broker 的 log 的目次,多副本尝试中,差异的 Broker 需要有差异的 log 目次;delete.topic.enable
设为 true 后,可以删除 Topic,而且连带 Topic 中的动静也一并删掉,不然,纵然挪用 kafka-topics.sh --delete
也无法删除 Topic,这是一个便利性的配置,对付开拓情况可以,出产情况必然要设为 false(默认)。尝试中发明, 假如有消费者在消费这个 Topic,那么也无法删除,照旧较量安详的。
剩下的东西大都在文档中也有提到。假如看一下这些剧本的话,会发明大都剧本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@"
,可见,这些东西都是利用运行 Java Class 的方法挪用的。
Kafka 的 Java API
在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,根基上,无论用什么语言开拓,都能找到相应的 API。下面说一下 Java 的 API 接口。
出产者的 API 只有一种,相比拟力简朴,代码如下,
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class SimpleProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094"); props.put("zookeeper.connect", "192.168.232.23:2181"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); String topic = "topic1"; Boolean isAsync = false; int messageNo = 1; while (true) { String messageStr = "Message_" + String.format("%05d",messageNo); long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } ++messageNo; } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "Send message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() + " to partition(" + metadata.partition() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }