我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题.如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它工作正常.但我想通过java api创建一个主题.经过长时间的搜索,我发现下面的代码,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息.我的代码中有什么问题吗?或者以其他任何方式实现上述目标?
AdminUtils API已弃用。有一个新的API AdminZkClient,我们可以使用它来管理Kafka服务器中的主题。
String zookeeperHost = "127.0.0.1:2181"; Boolean isSucre = false; int sessionTimeoutMs = 200000; int connectionTimeoutMs = 15000; int maxInFlightRequests = 10; Time time = Time.SYSTEM; String metricGroup = "myGroup"; String metricType = "myType"; KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs, connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType); AdminZkClient adminZkClient = new AdminZkClient(zkClient); String topicName1 = "myTopic"; int partitions = 3; int replication = 1; Properties topicConfig = new Properties(); adminZkClient.createTopic(topicName1,partitions,replication, topicConfig,RackAwareMode.Disabled$.MODULE$);
您可以参考此链接以获取详细信息:https : //www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/
在最新的(2.1.0)API中,该过程似乎已大大简化。使用适用于Kafka 2.1.0的最新API,可以按以下步骤进行操作
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; Properties properties = new Properties(); properties.load(new FileReader(new File("kafka.properties"))); AdminClient adminClient = AdminClient.create(properties); NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor) List<NewTopic> newTopics = new ArrayList<NewTopic>(); newTopics.add(newTopic); adminClient.createTopics(newTopics); adminClient.close();
kafka.properties
文件的内容如下
bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true auto.commit.interval.ms=1000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
请注意,必须关闭AdminClient的实例才能反映新创建的主题。
我修好了..经过长时间的研究......
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
从上面的代码中,ZkClient将创建一个主题,但是这个主题信息将无法识别kafka.所以我们要做的是,我们需要以下列方式为ZkClient创建对象,
首先导入以下声明,
import kafka.utils.ZKStringSerializer$;
并按以下方式为ZkClient创建对象,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
由于api已被更改,上面的代码不适用于kafka> 0.9,使用以下代码为kafka> 0.9
import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; public class KafkaTopicCreationInJava { public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } } }
对于那些试图在kafka v0.10.2.1中实现这一目的并遇到序列化错误的问题' java.io.StreamCorruptedException: invalid stream header: 3139322E
'以下是带有必要导入的示例工作代码.
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer; import kafka.utils.ZkUtils; public static void createTopic(String topicName, int numPartitions, int numReplication) { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs); //Ref: https://gist.github.com/jjkoshy/3842975 zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object o) throws ZkMarshallingError { return ZKStringSerializer.serialize(o); } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { return ZKStringSerializer.deserialize(bytes); } }); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Enforced$.MODULE$); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } }
只是指向任何使用Kafka更新版本查看此内容的人(在撰写本文时,我使用的是Kafka v0.10.0.0).
你必须改变;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
以下;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
完成后关闭连接也是个好主意;
zkClient.close();