如何通过Java在Kafka中创建主题

 mobiledu2502857407 发布于 2022-12-10 13:30

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题.如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它工作正常.但我想通过java api创建一个主题.经过长时间的搜索,我发现下面的代码,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息.我的代码中有什么问题吗?或者以其他任何方式实现上述目标?

5 个回答
  • AdminUtils API已弃用。有一个新的API AdminZkClient,我们可以使用它来管理Kafka服务器中的主题。

    String zookeeperHost = "127.0.0.1:2181";
    Boolean isSucre = false;
    int sessionTimeoutMs = 200000;
    int connectionTimeoutMs = 15000;
    int maxInFlightRequests = 10;
    Time time = Time.SYSTEM;
    String metricGroup = "myGroup";
    String metricType = "myType";
    KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                    connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
    
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    
    String topicName1 = "myTopic";
    int partitions = 3;
    int replication = 1;
    Properties topicConfig = new Properties();
    
    adminZkClient.createTopic(topicName1,partitions,replication,
                topicConfig,RackAwareMode.Disabled$.MODULE$);
    

    您可以参考此链接以获取详细信息:https : //www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

    2022-12-11 01:59 回答
  • 在最新的(2.1.0)API中,该过程似乎已大大简化。使用适用于Kafka 2.1.0的最新API,可以按以下步骤进行操作

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    
    Properties properties = new Properties();
    properties.load(new FileReader(new File("kafka.properties")));
    
    AdminClient adminClient = AdminClient.create(properties);
    NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)
    
    List<NewTopic> newTopics = new ArrayList<NewTopic>();
    newTopics.add(newTopic);
    
    adminClient.createTopics(newTopics);
    adminClient.close();
    

    kafka.properties文件的内容如下

    bootstrap.servers=localhost:9092
    group.id=test
    enable.auto.commit=true
    auto.commit.interval.ms=1000
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    请注意,必须关闭AdminClient的实例才能反映新创建的主题。

    2022-12-11 02:06 回答
  • 我修好了..经过长时间的研究......

    ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
    AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
    

    从上面的代码中,ZkClient将创建一个主题,但是这个主题信息将无法识别kafka.所以我们要做的是,我们需要以下列方式为ZkClient创建对象,

    首先导入以下声明,

    import kafka.utils.ZKStringSerializer$;
    

    并按以下方式为ZkClient创建对象,

    ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
    AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
    

    编辑1 :(对于@ajkret评论)

    由于api已被更改,上面的代码不适用于kafka> 0.9,使用以下代码为kafka> 0.9


    import java.util.Properties;
    import kafka.admin.AdminUtils;
    import kafka.utils.ZKStringSerializer$;
    import kafka.utils.ZkUtils;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.ZkConnection;
    
    public class KafkaTopicCreationInJava
    {
        public static void main(String[] args) throws Exception {
            ZkClient zkClient = null;
            ZkUtils zkUtils = null;
            try {
                String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
                int sessionTimeOutInMs = 15 * 1000; // 15 secs
                int connectionTimeOutInMs = 10 * 1000; // 10 secs
    
                zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
                zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
    
                String topicName = "testTopic";
                int noOfPartitions = 2;
                int noOfReplication = 3;
                Properties topicConfiguration = new Properties();
    
                AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
    
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (zkClient != null) {
                    zkClient.close();
                }
            }
        }
    }
    

    2022-12-11 02:07 回答
  • 对于那些试图在kafka v0.10.2.1中实现这一目的并遇到序列化错误的问题' java.io.StreamCorruptedException: invalid stream header: 3139322E'以下是带有必要导入的示例工作代码.

    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.ZkConnection;
    import org.I0Itec.zkclient.exception.ZkMarshallingError;
    import org.I0Itec.zkclient.serialize.ZkSerializer;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.PartitionInfo;
    
    import kafka.admin.AdminUtils;
    import kafka.admin.RackAwareMode;
    import kafka.utils.ZKStringSerializer;
    import kafka.utils.ZkUtils;
    
    public static void createTopic(String topicName, int numPartitions, int numReplication) {
            ZkClient zkClient = null;
            ZkUtils zkUtils = null;
            try {
                String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
                int sessionTimeOutInMs = 15 * 1000; // 15 secs
                int connectionTimeOutInMs = 10 * 1000; // 10 secs
    
                zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
                //Ref: https://gist.github.com/jjkoshy/3842975
                zkClient.setZkSerializer(new ZkSerializer() {
                    @Override
                    public byte[] serialize(Object o) throws ZkMarshallingError {
                        return ZKStringSerializer.serialize(o);
                    }
    
                    @Override
                    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                        return ZKStringSerializer.deserialize(bytes);
                    }
                });
    
                zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
    
                int noOfPartitions = 2;
                int noOfReplication = 3;
                Properties topicConfiguration = new Properties();
    
                AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
                        RackAwareMode.Enforced$.MODULE$);
    
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (zkClient != null) {
                    zkClient.close();
                }
            }
        }
    

    2022-12-11 02:09 回答
  • 只是指向任何使用Kafka更新版本查看此内容的人(在撰写本文时,我使用的是Kafka v0.10.0.0).

    你必须改变;

    AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
    

    以下;

    AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
    

    完成后关闭连接也是个好主意;

    zkClient.close();
    

    2022-12-11 02:56 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有