Wie produziere und konsumiere ich Nachrichten von Apache Kafka mit Java?
Jul 11, 2025 am 01:43 AMDer Schlüssel zum Erstellen und Verbrauch von Apache -Kafka -Nachrichten mithilfe von Java liegt darin, die Hersteller- und Verbraucher -APIs ordnungsgem?? zu konfigurieren und ihre grundlegenden Prozesse zu verstehen. 1. Fügen Sie zuerst Kafka -Client -Abh?ngigkeiten hinzu, um sicherzustellen, dass die Version mit dem Cluster kompatibel ist. 2. Konfigurieren Sie beim Schreiben von Produzenten Bootstrap.Servers, Key.Serializer und Value.Serializer und erstellen Sie eine Kafkaproducer -Instanz, um Nachrichten zu senden, achten Sie auf das Schlie?en der Ressourcen und die optionale Rückrufverarbeitung. 3. Wenn Sie Verbraucher schreiben, Group.ID, Deserializer usw. konfigurieren, verwenden Sie Kafkaconsumer, um Themen zu abonnieren und Nachrichten in einer Schleife zu ziehen, auf die Strategie der übermittlung von Offsets achten. 4. H?ufige Probleme sind Gruppen.ID-Konflikte, unangemessene Einreichung von Offsets und ein falsches Ressourcenschlie? usw. Es wird empfohlen, den überprüfungsprozess zuerst in einer lokalen eigenst?ndigen Umgebung zu testen.
Es ist eigentlich nicht schwierig, Apache -Kafka -Nachrichten in Java zu produzieren und zu konsumieren. Der Kern liegt darin, die grundlegenden Konzepte von Kafka zu verstehen und die Verwendung von Produzenten- und Verbraucher -APIs zu beherrschen. Solange die Konfiguration ordnungsgem?? ist und die Codestruktur klar ist, kann das Senden und Empfangen von Nachrichten leicht erreicht werden.

Fügen Sie Kafka -Abh?ngigkeiten hinzu
Bevor Sie mit dem Schreiben von Code beginnen, müssen Sie die Kundenbibliothek von Kafka in Ihr Projekt einführen. Wenn Sie Maven verwenden, k?nnen Sie die folgenden Abh?ngigkeiten in pom.xml
hinzufügen:
<De vorangehen> <gruppe> org.apache.kafka </Groupid> <artifactid> Kafka-Clients </artifactid> <version> 3.6.0 </Version> </abh?ngig>
Diese Version ist relativ neu und relativ stabil. Natürlich k?nnen Sie auch die entsprechende Client -Version basierend auf Ihrer Kafka -Clusterversion ausw?hlen.

Schreiben Sie einen Kafka -Produzenten
Die Aufgabe des Produzenten besteht darin, Nachrichten an das Kafka -Thema zu senden. Zu den Hauptschritten geh?rt das Konfigurieren von Eigenschaften, das Erstellen einer Produzenteninstanz, das Erstellen eines Nachrichtendatensatzes und das Senden.
Schlüsselkonfigurationselemente sind:

-
bootstrap.servers
: Kafka Broker -Adresse -
key.serializer
undvalue.serializer
: Geben Sie die Serialisierungsmethode von Schlüssel und Wert an.StringSerializer
wird im Allgemeinen verwendet.
Der Beispielcode lautet wie folgt:
Eigenschaften props = neue Eigenschaften (); props.put ("bootstrap.servers", "localhost: 9092"); props.put ("key.serializer", "org.apache.kafka.common.serialization.Stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.Serialization.Stringserializer"); Produzent <String, String> Produzent = neuer KafkaproDucer <> (Requisiten); ProducTeRecord <String, String> record = new ProducerRecord <> ("my-topic", "key", "value"); Produzent.Send (Record); Produzent.CLOSE ();
Einige Punkte zu beachten:
- Nach dem Senden ist es am besten,
close()
anzurufen, um Ressourcenleckage zu vermeiden - Wenn Sie best?tigen m?chten, ob das Senden erfolgreich ist, k?nnen Sie die Callback -Funktion addieren
.send(record, callback)
- Wenn es sich nur um einen Test handelt, k?nnen Sie den Schlüssel weglassen
Schreiben Sie den Kafka -Verbraucher
Der Verbraucher ist dafür verantwortlich, Nachrichten aus dem Thema Kafka zu lesen. Im Vergleich zu Produzenten ist die Logik des Verbrauchers etwas komplizierter, da sie aktiv an Nachrichten, Offsets usw. verarbeitet werden müssen.
Zu den wichtigsten Konfigurationen geh?ren:
-
bootstrap.servers
: Geben Sie auch die Broker -Adresse an -
group.id
: Verbrauchergruppen -ID, muss festgelegt werden, andernfalls wird ein Fehler gemeldet -
key.deserializer
undvalue.deserializer
: Deserializer, normalerweise mitStringDeserializer
Ein einfacher Verbraucherprozess ist wie folgt:
Eigenschaften props = neue Eigenschaften (); props.put ("bootstrap.servers", "localhost: 9092"); props.put ("Gruppe.id", "Testgruppe"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); Kafkaconsumer <String, String> Consumer = new Kafkaconsumer <> (Requisiten); Consumer. while (wahr) { CENSPEERRECORDS <String, String> Records = Consumer.Poll (Duration.ofmillis (100)); für (CENSELERRECORD <STRING, STRING> DRECKREISE: Datens?tze) { System.out.println ("empfangen:" record.Value ()); } }
Notiz:
-
poll()
-Methode zieht eine Menge Daten und verarbeitet sie in einer Schleife - Vergessen Sie nicht, nach dem Verbrauch einen Offset einzureichen. Sie k?nnen w?hlen, ob Sie automatisch oder manuell einreichen m?chten
- Wenn Sie nur einmal konsumieren und dann beenden m?chten, k?nnen Sie die Schleife nach der Verarbeitung brechen
H?ufig gestellte Fragen und Vorsichtsma?nahmen
Manchmal werden Sie feststellen, dass die Nachricht nicht wiederholt empfangen oder konsumiert wurde, was normalerweise aus den folgenden Gründen verursacht wird:
- Consumer Group.ID Einstellungsfehler oder Konflikt
- Die automatische übermittlung der Versatzfrequenz ist zu hoch oder zu niedrig
- Das Vers?umnis, den Hersteller oder Verbraucher nicht zu schlie?en, führt zu einem inkonsistenten Status
- Der Kafka -Broker wird nicht gestartet oder das Netzwerk ist nicht verbunden
Es wird empfohlen, zuerst die lokale eigenst?ndige Umgebung w?hrend der Entwicklung zu leiten und dann zum Cluster zu gehen.
Grunds?tzlich ist das. Der Prozess der Verbindung zu Kafka in Java ist nicht kompliziert, aber einige Details sind leicht zu ignorieren, wie z. B. Serialisierung, Gruppe.ID und Einreichungsstrategien. Solange Sie auf die Konfiguration und den Prozess achten, k?nnen Sie schnell ausführen.
Das obige ist der detaillierte Inhalt vonWie produziere und konsumiere ich Nachrichten von Apache Kafka mit Java?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Hei?e KI -Werkzeuge

Undress AI Tool
Ausziehbilder kostenlos

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Clothoff.io
KI-Kleiderentferner

Video Face Swap
Tauschen Sie Gesichter in jedem Video mühelos mit unserem v?llig kostenlosen KI-Gesichtstausch-Tool aus!

Hei?er Artikel

Hei?e Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Hei?e Themen

Die rationale Verwendung semantischer Tags in HTML kann die Klarheit, Zug?nglichkeit und SEO -Effekte der Seitenstruktur verbessern. 1. für unabh?ngige Inhaltsbl?cke wie Blog-Beitr?ge oder Kommentare muss sie in sich geschlossen werden. 2. für klassifizierungsbezogene Inhalte, die normalerweise Titel enthalten, ist für verschiedene Module der Seite geeignet. 3.. Wird für Hilfsinformationen im Zusammenhang mit dem Hauptinhalt verwendet, nicht jedoch Kern, wie z. B. Seitenleistenempfehlungen oder Autorprofile. In der tats?chlichen Entwicklung sollten Etiketten kombiniert und andere, überm??ige Verschachtelung vermeiden, die Struktur einfach halten und die Rationalit?t der Struktur durch Entwicklerwerkzeuge überprüfen.

Wenn Sie auf die Eingabeaufforderung sto?en "Dieser Vorgang erfordert die Eskalation der Berechtigungen", müssen Sie die Administratorberechtigungen ben?tigen, um fortzufahren. Zu den L?sungen geh?ren: 1. Klicken Sie mit der rechten Maustaste auf das Programm "AS Administrator ausführen" oder setzen Sie die Verknüpfung so fest, dass immer als Administrator ausgeführt wird. 2. überprüfen Sie, ob es sich bei dem Girokonto um ein Administratorkonto handelt, falls nicht die Unterstützung von Administratoren, wenn nicht. 3.. Verwenden Sie Administratorberechtigungen, um eine Eingabeaufforderung oder eine PowerShell zu ?ffnen, um relevante Befehle auszuführen. 4. Umgehen Sie die Beschr?nkungen, indem Sie bei Bedarf Dateienbesitz oder ?nderungen der Registrierung ?ndern. Solche Vorg?nge müssen jedoch vorsichtig sein und die Risiken vollst?ndig verstehen. Best?tigen Sie die Erlaubnisidentit?t und versuchen Sie die oben genannten Methoden normalerweise l?sen.

Es gibt drei Hauptunterschiede zwischen Callable und Runnable in Java. Zun?chst kann die Callable -Methode das Ergebnis zurückgeben, das für Aufgaben geeignet ist, die Werte wie Callable zurückgeben müssen. W?hrend die Run () -Methode von Runnable keinen Rückgabewert hat, geeignet für Aufgaben, die nicht zurückkehren müssen, z. B. die Protokollierung. Zweitens erm?glicht Callable überprüfte Ausnahmen, um die Fehlerübertragung zu erleichtern. w?hrend laufbar Ausnahmen innen verarbeiten müssen. Drittens kann Runnable direkt an Thread oder Executorservice übergeben werden, w?hrend Callable nur an ExecutorService übermittelt werden kann und das zukünftige Objekt an zurückgibt

JavaprovidesMultiPLesynchronizationToolsForthreadsafety.1.SynchronizedblocksensuremutualexclusionByLockingMethodSorspecificcodesction.2.REENNRANTLANTLOCKOFFERSADVEDCONTROL, einschlie?lich TrylockandfairnessPolicies.

Der Klassenladermechanismus von Java wird über den Classloader implementiert und sein Kernworkflow ist in drei Stufen unterteilt: Laden, Verknüpfung und Initialisierung. W?hrend der Ladephase liest Classloader den Bytecode der Klasse dynamisch und erstellt Klassenobjekte. Zu den Links geh?ren die überprüfung der Richtigkeit der Klasse, die Zuweisung von Ged?chtnissen für statische Variablen und das Parsen von Symbolreferenzen; Die Initialisierung führt statische Codebl?cke und statische Variablenzuordnungen durch. Die Klassenbelastung übernimmt das übergeordnete Delegationsmodell und priorisiert den übergeordneten Klassenlader, um Klassen zu finden, und probieren Sie Bootstrap, Erweiterung und ApplicationClassloader. Entwickler k?nnen Klassenloader wie URLASSL anpassen

Der Schlüssel zur Behandlung von Java-Ausnahme besteht darin, zwischen überprüften und ungeprüften Ausnahmen zu unterscheiden und Try-Catch schlie?lich und angemessen zu verwenden. 1. überprüfte Ausnahmen wie IOException müssen gezwungen werden, um zu handhaben, was für erwartete externe Probleme geeignet ist. 2. Unkontrollierte Ausnahmen wie NullPointerexception werden normalerweise durch Programmlogikfehler verursacht und sind Laufzeitfehler. 3. Wenn Sie Ausnahmen erfassen, sollten sie spezifisch und klar sein, um die allgemeine Erfassung von Ausnahme zu vermeiden. 4.. Es wird empfohlen, Try-with-Resources zu verwenden, um die Ressourcen automatisch zu schlie?en, um die manuelle Reinigung des Codes zu verringern. 5. In der Ausnahmebehandlung sollten detaillierte Informationen in Kombination mit Protokoll -Frameworks aufgezeichnet werden, um sie sp?ter zu erleichtern

Java unterstützt asynchrone Programmierungen, einschlie?lich der Verwendung von Vervollst?ndigungsfuture, reaktionsschnellen Streams (wie Projecreactor) und virtuellen Threads in Java19. 1.CompletableFuture verbessert die Code -Lesbarkeit und -wartung durch Kettenaufrufe und unterstützt Aufgabenorchestrierung und Ausnahmebehandlung. 2. Projecreactor bietet Mono- und Flusstypen zur Implementierung der reaktionsschnellen Programmierung mit Backpressure -Mechanismus und reichhaltigen Operatoren. 3.. Virtuelle Themen senken die Parallelit?tskosten, sind für E/O-intensive Aufgaben geeignet und sind leichter und leichter zu erweitern als herk?mmliche Plattformf?den. Jede Methode hat anwendbare Szenarien, und entsprechende Tools sollten entsprechend Ihren Anforderungen ausgew?hlt werden, und gemischte Modelle sollten vermieden werden, um die Einfachheit aufrechtzuerhalten

In Java werden statische Schlüsselw?rter verwendet, um Variablen und Methoden zu erstellen, die zur Klasse selbst geh?ren, und nicht Instanzen der Klasse. 1. statische Variablen werden durch Instanzen aller Klassen geteilt und sind für die Speicherung von Daten geeignet, die von allen Objekten geteilt werden, z. B. Schulname in der Schülerklasse. 2. statische Methoden geh?ren zu Klassen und h?ngen nicht von Objekten ab. Sie werden h?ufig in Werkzeugfunktionen wie Math.sqrt () verwendet und k?nnen nur auf andere statische Mitglieder zugreifen. 3.. Statische Codebl?cke werden zum Ausführen von Initialisierungsvorg?ngen beim Laden der Klassen verwendet, z. B. das Laden von Bibliotheken oder das Einstellen von Protokollen. 4. Statische innere Klassen k?nnen unabh?ngig von der externen Klasse instanziiert werden, aber auf nicht statische Mitglieder der externen Klasse k?nnen nicht zugegriffen werden. Die rationale Verwendung von statischer kann effektiv Ressourcen und Verhaltensweisen auf Klassenebene verwalten.
