欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: Valleylord

Kafka 的东西和编程接口

Kafka 的东西

Kafka 提供的东西照旧较量全的,昆山软件开发bin/ 目次下的东西有以下一些,

bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh
bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh
bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh
bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh
bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh
bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh
bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh

我常用的呼吁有以下几个,

bin/kafka-server-start.sh -daemon config/server.properties &
bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181
bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1
bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning
bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

kafka-server-start.sh 是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties,该文件中的设置项待会再说.尚有一个 -daemon 参数,这个是将 Kafka 放在靠山用守护历程的方法运行,假如不加这个参数,Kafka 会在运行一段时间后自动退出,听说这个是 0.10.0.0 版本才有的问题 5。kafka-topics.sh 是用于打点 Topic 的东西,我主要用的 --describe--list--delete--create 这4个成果,上述的例子根基是不言自明的,--replication-factor 3--partitions 2 这两个参数别离暗示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh 和 kafka-console-producer.sh 是出产者和消费者的浅易终端东西,在调试的时候较量有用,我常用的是 kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的宣布版本 3.4.8,端口是默认2181,与 Broker 在同一台呆板上。

下面说一下 Broker 启动的设置文件 config/server.properties,我在默认设置的基本上,修改了以下一些,

broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
delete.topic.enable=true

broker.id 是 Kafka 集群中的 Broker ID,不行反复,我在多副本的尝试中,将他们别离配置为0、1、2;listeners 是 Broker 监听的地点,默认是监听 localhost:9092,因为我不是单机尝试,所以修改为本机局域网地点,虽然,假如要监听所有地点的话,也可以配置为 0.0.0.0:9092,多副本尝试中,将监听端口别离配置为 9092、9093、9094;log.dirs 是 Broker 的 log 的目次,多副本尝试中,差异的 Broker 需要有差异的 log 目次;delete.topic.enable 设为 true 后,可以删除 Topic,而且连带 Topic 中的动静也一并删掉,不然,纵然挪用 kafka-topics.sh --delete 也无法删除 Topic,这是一个便利性的配置,对付开拓情况可以,出产情况必然要设为 false(默认)。尝试中发明, 假如有消费者在消费这个 Topic,那么也无法删除,照旧较量安详的。

剩下的东西大都在文档中也有提到。假如看一下这些剧本的话,会发明大都剧本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可见,这些东西都是利用运行 Java Class 的方法挪用的。

Kafka 的 Java API

在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,根基上,无论用什么语言开拓,都能找到相应的 API。下面说一下 Java 的 API 接口。

出产者的 API 只有一种,相比拟力简朴,代码如下,

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducerDemo {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
        props.put("zookeeper.connect", "192.168.232.23:2181");
        props.put("client.id", "DemoProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
        String topic = "topic1";
        Boolean isAsync = false;
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + String.format("%05d",messageNo);
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ++messageNo;
        }
    }
}
class DemoCallBack implements Callback {
    private final long startTime;
    private final int key;
    private final String message;
    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "Send     message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
                            " to partition(" + metadata.partition() +
                            ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}