How to produce and consume messages from Apache Kafka with Java?
Jul 11, 2025 am 01:43 AMThe key to producing and consuming Apache Kafka messages using Java is to properly configure the Producer and Consumer APIs and understand their basic processes. 1. First add Kafka client dependencies to ensure that the version is compatible with the cluster; 2. When writing producers, configure bootstrap.servers, key.serializer and value.serializer, and create a KafkaProducer instance to send messages, pay attention to closing resources and optional callback processing; 3. When writing consumers, configure group.id, deserializer, etc., use KafkaConsumer to subscribe to topics and pull messages in a loop, pay attention to the strategy of submitting offsets; 4. Common problems include group.id conflicts, inappropriate submission of offsets, and incorrect resource closing, etc. It is recommended to test the verification process in a local stand-alone environment first.
It is actually not difficult to produce and consume Apache Kafka messages in Java. The core lies in understanding the basic concepts of Kafka and mastering the use of Producer and Consumer APIs. As long as the configuration is properly and the code structure is clear, the sending and receiving of messages can be easily achieved.

Add Kafka dependencies
Before you start writing code, you need to introduce Kafka's client library into your project. If you are using Maven, you can add the following dependencies in pom.xml
:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version> </dependency>
This version is relatively new and relatively stable. Of course, you can also select the corresponding client version based on your Kafka cluster version.

Write a Kafka Producer
The producer's task is to send messages to the Kafka topic. The main steps include configuring properties, creating a Producer instance, building a message record and sending.
Key configuration items are:

-
bootstrap.servers
: Kafka broker address -
key.serializer
andvalue.serializer
: Specify the serialization method of key and value.StringSerializer
is generally used.
The sample code is as follows:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value"); producer.send(record); producer.close();
Some points to note:
- It is best to call
close()
after sending to avoid resource leakage - If you want to confirm whether the sending is successful, you can add the callback function.send
.send(record, callback)
- If it's just a test, you can omit the key
Write Kafka Consumer
The consumer is responsible for reading messages from the Kafka topic. Compared with producers, the consumer's logic is a little more complicated because it requires actively pulling messages, processing offsets, etc.
Key configurations include:
-
bootstrap.servers
: also specify the broker address -
group.id
: consumer group ID, must be set, otherwise an error will be reported -
key.deserializer
andvalue.deserializer
: Deserializer, usually usingStringDeserializer
A simple consumer process is as follows:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " record.value()); } }
Note:
-
poll()
method will pull a batch of data and process it in a loop - Don’t forget to submit offset after consumption, you can choose to submit automatically or manually
- If you want to consume only once and then exit, you can break loop after processing
Frequently Asked Questions and Precautions
Sometimes you will find that the message has not been received or has been consumed repeatedly, which is usually caused by the following reasons:
- Consumer group.id setting error or conflict
- Automatically submit offset frequency is too high or too low
- Failure to close the producer or consumer correctly results in inconsistent status
- Kafka broker is not started or the network is not connected
It is recommended to first run the local stand-alone environment during development and then go to the cluster.
Basically that's it. The process of connecting to Kafka in Java is not complicated, but some details are easy to ignore, such as serialization, group.id, and submission strategies. As long as you pay attention to the configuration and process, you can run quickly.
The above is the detailed content of How to produce and consume messages from Apache Kafka with Java?. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

The rational use of semantic tags in HTML can improve page structure clarity, accessibility and SEO effects. 1. Used for independent content blocks, such as blog posts or comments, it must be self-contained; 2. Used for classification related content, usually including titles, and is suitable for different modules of the page; 3. Used for auxiliary information related to the main content but not core, such as sidebar recommendations or author profiles. In actual development, labels should be combined and other, avoid excessive nesting, keep the structure simple, and verify the rationality of the structure through developer tools.

When you encounter the prompt "This operation requires escalation of permissions", it means that you need administrator permissions to continue. Solutions include: 1. Right-click the "Run as Administrator" program or set the shortcut to always run as an administrator; 2. Check whether the current account is an administrator account, if not, switch or request administrator assistance; 3. Use administrator permissions to open a command prompt or PowerShell to execute relevant commands; 4. Bypass the restrictions by obtaining file ownership or modifying the registry when necessary, but such operations need to be cautious and fully understand the risks. Confirm permission identity and try the above methods usually solve the problem.

There are three main differences between Callable and Runnable in Java. First, the callable method can return the result, suitable for tasks that need to return values, such as Callable; while the run() method of Runnable has no return value, suitable for tasks that do not need to return, such as logging. Second, Callable allows to throw checked exceptions to facilitate error transmission; while Runnable must handle exceptions internally. Third, Runnable can be directly passed to Thread or ExecutorService, while Callable can only be submitted to ExecutorService and returns the Future object to

Javaprovidesmultiplesynchronizationtoolsforthreadsafety.1.synchronizedblocksensuremutualexclusionbylockingmethodsorspecificcodesections.2.ReentrantLockoffersadvancedcontrol,includingtryLockandfairnesspolicies.3.Conditionvariablesallowthreadstowaitfor

Java's class loading mechanism is implemented through ClassLoader, and its core workflow is divided into three stages: loading, linking and initialization. During the loading phase, ClassLoader dynamically reads the bytecode of the class and creates Class objects; links include verifying the correctness of the class, allocating memory to static variables, and parsing symbol references; initialization performs static code blocks and static variable assignments. Class loading adopts the parent delegation model, and prioritizes the parent class loader to find classes, and try Bootstrap, Extension, and ApplicationClassLoader in turn to ensure that the core class library is safe and avoids duplicate loading. Developers can customize ClassLoader, such as URLClassL

The key to Java exception handling is to distinguish between checked and unchecked exceptions and use try-catch, finally and logging reasonably. 1. Checked exceptions such as IOException need to be forced to handle, which is suitable for expected external problems; 2. Unchecked exceptions such as NullPointerException are usually caused by program logic errors and are runtime errors; 3. When catching exceptions, they should be specific and clear to avoid general capture of Exception; 4. It is recommended to use try-with-resources to automatically close resources to reduce manual cleaning of code; 5. In exception handling, detailed information should be recorded in combination with log frameworks to facilitate later

Java supports asynchronous programming including the use of CompletableFuture, responsive streams (such as ProjectReactor), and virtual threads in Java19. 1.CompletableFuture improves code readability and maintenance through chain calls, and supports task orchestration and exception handling; 2. ProjectReactor provides Mono and Flux types to implement responsive programming, with backpressure mechanism and rich operators; 3. Virtual threads reduce concurrency costs, are suitable for I/O-intensive tasks, and are lighter and easier to expand than traditional platform threads. Each method has applicable scenarios, and appropriate tools should be selected according to your needs and mixed models should be avoided to maintain simplicity

Static keywords are used in Java to create variables and methods that belong to the class itself, rather than instances of the class. 1. Static variables are shared by instances of all classes and are suitable for storing data shared by all objects, such as schoolName in the Student class. 2. Static methods belong to classes and do not depend on objects. They are often used in tool functions, such as Math.sqrt(), and can only access other static members. 3. Static code blocks are used to perform initialization operations when class loading, such as loading libraries or setting logs. 4. Static inner classes can be instantiated independently of the external class, but non-static members of the external class cannot be accessed. Rational use of static can effectively manage class-level resources and behaviors.
