想自己耍耍Kafka,简单装好后,运行个简单的demo,结果直接卡住不动了,但使用命令行发消息是正常的。我的Kafka版本为2.6,demo代码为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package top.jlice;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test", "jlice.top-" + i));
        }
        producer.close();
    }
}

通过debug发现,在waitOnMetadata方法里的下面这行卡住的:

1
metadata.awaitUpdate(version, remainingWaitMs);

尽管KafkaProducer.send是异步的,还是可能出现阻塞的。KafkaProducer.send方法返回一个Future<RecordMetadata>,为了取得结果,可以使用get方法:

1
2
3
4
5
6
7
try {
    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<String, String>("test", "jlice.top-" + i)).get();
    }
} catch (Exception e) {
    e.printStackTrace();
}

Kafka默认的超时时间是60s,这样修改后在60s后出现报错:

1
2
3
4
5
6
7
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1314)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:970)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:758)
	at top.jlice.MyProducer.main(MyProducer.java:17)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.

当然,还可以为properties设置一个更短的超时时间,比如1s,这样会更快出现报错→_→

1
properties.put("max.block.ms", 1000);

pom.xml里点进kafka的版本查看kafka-client-2.6.0.pom,发现了下面的依赖配置:

1
2
3
4
5
6
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
</dependency>

注意到scope为provided,maven官网该项的解释是:

this is much like compile, but indicates you expect the JDK or a container to provide the dependency at runtime.

心想,可能是因为缺少了这个依赖项吧,于是在pom.xml里添加该依赖项:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.2</version>
    </dependency>
</dependencies>

然后,就正常了。

参考链接