国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

Heim Web-Frontend js-Tutorial Kafka-Grundlagen mit einem praktischen Beispiel

Kafka-Grundlagen mit einem praktischen Beispiel

Dec 28, 2024 am 09:26 AM

In den letzten Wochen habe ich mich intensiv mit Kafka besch?ftigt und mir nebenbei Notizen gemacht. Ich habe beschlossen, daraus einen Blog-Beitrag zu organisieren und zu strukturieren, in dem neben Konzepten und Tipps auch ein praktisches Beispiel enthalten ist NestJS und KafkaJs.

Was ist Kafka?

Apache Kafka ist eine verteilte Event-Streaming-Plattform, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Es erm?glicht das Speichern, Verarbeiten und Abrufen gro?er Datenstr?me mit hohem Durchsatz und geringer Latenz und eignet sich daher für den Aufbau von Echtzeit-Datenpipelines und ereignisgesteuerten Anwendungen.

Hauptmerkmale:

  • Ereignis-Streaming: Kafka organisiert Daten in Themen, bei denen es sich um geordnete Protokolle von Ereignissen handelt.
  • Verteilte Architektur: Kafka ist auf Skalierbarkeit und Fehlertoleranz ausgelegt. Es fungiert als Cluster von Knoten, die als Broker bezeichnet werden, und kann Daten auf mehrere Server verteilen.
  • Publish-Subscribe-Modell: Produzenten schreiben Nachrichten zu den Themen und Konsumenten lesen Nachrichten von ihnen. Kafka unterstützt mehrere Verbraucher, sodass verschiedene Anwendungen unabh?ngig voneinander denselben Datenstrom verarbeiten k?nnen.
  • Hohe Leistung: Kafka ist für hohen Durchsatz optimiert und verarbeitet Millionen von Nachrichten pro Sekunde mit geringer Latenz.
  • Dauerhafter Speicher: Kafka speichert Nachrichten auf der Festplatte mit konfigurierbaren Aufbewahrungsfristen und gew?hrleistet so Datenpersistenz und Zuverl?ssigkeit.
  • Partitionierung und Replikation: Themen werden zur Skalierbarkeit in Partitionen unterteilt und zur Fehlertoleranz über Broker hinweg repliziert.
  • Wiederspielbarkeit: Verbraucher k?nnen Nachrichten erneut lesen, indem sie ihren Offset zurücksetzen und so die Daten erneut verarbeiten oder wiederherstellen.
  • Integration und ?kosystem: Kafka l?sst sich in verschiedene Systeme integrieren und verfügt über Tools wie Kafka Connect (zur Datenintegration) und Kafka Streams (zur Stream-Verarbeitung).

Vorteile

  • Zuverl?ssigkeit: Es gew?hrleistet Fehlertoleranz durch Datenverteilung, Replikation und Partitionierung.
  • Skalierbarkeit: Kafka kann riesige Datenmengen verarbeiten und ohne Ausfallzeiten horizontal skalieren.
  • Haltbarkeit:Nachrichten werden umgehend gespeichert, wodurch Ausfallsicherheit und Datenpersistenz gew?hrleistet sind.
  • Leistung: Kafka beh?lt auch bei extremer Datenlast eine hohe Leistung bei und verarbeitet gro?e Datenmengen ohne Ausfallzeiten oder Datenverlust.

Nachteile

Diese Kompromisse sind bewusste Designentscheidungen, um die Leistung von Kafka zu maximieren, k?nnen jedoch bei Anwendungsf?llen, die eine gr??ere Flexibilit?t erfordern, eine Herausforderung darstellen:

  • Eingeschr?nkte Flexibilit?t: Kafka bietet keine Unterstützung für erweiterte Abfragen, beispielsweise das Filtern bestimmter Daten in Berichten. Verbraucher müssen diese Aufgaben erledigen, da Kafka Nachrichten durch Offsets in der Reihenfolge abruft, in der sie empfangen werden.
  • Nicht für die Langzeitspeicherung konzipiert: Kafka zeichnet sich durch das Streamen von Daten aus, eignet sich jedoch nicht für die Speicherung historischer Daten über l?ngere Zeitr?ume. Datenduplizierung kann die Speicherung gro?er Datenmengen kostspielig machen.
  • Keine Unterstützung für Wildcard-Themen: Kafka erlaubt nicht die Nutzung mehrerer Themen mithilfe von Wildcard-Mustern (z. B. log-2024-*).

Anwendungsf?lle

  • Echtzeitanalyse: Datenstr?me verarbeiten und analysieren, sobald sie auftreten.
  • Ereignisbeschaffung: Zeichnen Sie alle ?nderungen am Status einer Anwendung als Abfolge von Ereignissen auf.
  • Protokollaggregation: Protokolle von verteilten Systemen sammeln und verwalten.
  • Datenpipelines:Daten zwischen Systemen zuverl?ssig und effizient streamen.
  • IoT-Anwendungen:Verarbeiten Sie Sensordaten von IoT-Ger?ten in Echtzeit.

Wie funktioniert Kafka?

Kafka integriert die Funktionen von Warteschlangen- und Publish-Subscribe-Messaging-Modellen und bietet Verbrauchern die Vorteile beider Ans?tze.

  • Warteschlangen erm?glichen eine skalierbare Datenverarbeitung durch die Verteilung von Aufgaben auf mehrere Verbraucherinstanzen. Herk?mmliche Warteschlangen unterstützen jedoch nicht mehrere Abonnenten.
  • Das Publish-Subscribe-Modell unterstützt mehrere Abonnenten, kann Aufgaben jedoch nicht auf mehrere Arbeitsprozesse verteilen, da jede Nachricht an alle Abonnenten gesendet wird.

Kafka verwendet ein partitioniertes Protokollsystem, um die Vorteile von Warteschlangen- und Publish-Subscribe-Modellen zu kombinieren. Protokolle, bei denen es sich um geordnete Sequenzen von Datens?tzen handelt, werden in Partitionen unterteilt, wobei jede Partition verschiedenen Abonnenten (Konsumenten) zugewiesen wird. Dieses Setup erm?glicht es mehreren Abonnenten, ein Thema zu teilen und gleichzeitig die Skalierbarkeit beizubehalten.

Kafka fundamentals with a practical example

Ereignisse, Themen und Partitionen

Wir haben gesehen, dass Kafka eine Plattform ist, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Bevor wir darüber sprechen, wie mit diesen Ereignissen umgegangen wird, müssen wir eine Definition dafür haben:

Ein Ereignis ist eine Aktion, ein Vorfall oder eine ?nderung, die in Anwendungen aufgezeichnet wird, zum Beispiel eine Zahlung, ein Website-Klick oder eine Temperaturmessung.

Ereignisse werden in Kafka als Schlüssel/Wert-Paare modelliert, wobei sowohl Schlüssel als auch Werte in Bytesequenzen serialisiert werden.

  • Werte stellen h?ufig serialisierte Dom?nenobjekte oder Roheingaben dar, wie z. B. Sensorausgaben oder andere Anwendungsdaten. Sie fassen die Kerninformationen zusammen, die im Kafka-Ereignis übermittelt werden.
  • Schlüssel k?nnen komplexe Dom?nenobjekte sein, sind jedoch h?ufig einfache Typen wie Zeichenfolgen oder Ganzzahlen. Anstatt ein einzelnes Ereignis eindeutig zu identifizieren (wie es ein Prim?rschlüssel in einer relationalen Datenbank tut), identifizieren Schlüssel normalerweise Entit?ten innerhalb des Systems, wie z. B. einen bestimmten Benutzer, eine Bestellung oder ein verbundenes Ger?t.

Kafka organisiert Ereignisse in geordneten Protokollen, die als Themen bezeichnet werden. Wenn ein externes System ein Ereignis in Kafka schreibt, wird es an das Ende eines Themas angeh?ngt. Nachrichten bleiben für eine konfigurierbare Dauer in den Themen, auch nachdem sie gelesen wurden. Im Gegensatz zu Warteschlangen sind Themen dauerhaft, repliziert und fehlertolerant und speichern Ereignisdatens?tze effizient. Protokolle k?nnen jedoch nur sequentiell gescannt und nicht abgefragt werden.

Themen werden als Protokolldateien auf der Festplatte gespeichert. Festplatten unterliegen jedoch Einschr?nkungen wie begrenzter Gr??e und E/A. Um dieses Problem zu l?sen, erm?glicht Kafka die Unterteilung von Themen in Partitionen, wodurch ein einzelnes Protokoll in mehrere Protokolle aufgeteilt wird, die auf verschiedene Server verteilt werden k?nnen. Diese Partitionierung erm?glicht Kafka eine horizontale Skalierung und verbessert so seine F?higkeit, gro?e Mengen an Ereignissen und einen hohen Durchsatz zu verarbeiten.

Kafka weist Partitionen Nachrichten basierend darauf zu, ob sie einen Schlüssel haben:

  • Kein Schlüssel: Nachrichten werden im Round-Robin-Verfahren über alle Partitionen verteilt, wodurch eine gleichm??ige Datenverteilung gew?hrleistet wird, die Nachrichtenreihenfolge jedoch nicht erhalten bleibt.
  • Mit Schlüssel: Die Partition wird durch Hashing des Schlüssels bestimmt, wodurch sichergestellt wird, dass Nachrichten mit demselben Schlüssel immer in dieselbe Partition gelangen und ihre Reihenfolge beibehalten.

Makler

Kafka arbeitet als verteilte Dateninfrastruktur unter Verwendung von Knoten, die als Broker bezeichnet werden und zusammen einen Kafka-Cluster bilden. Broker k?nnen auf Bare-Metal-Hardware, einer Cloud-Instanz, in einem von Kubernetes verwalteten Container, in Docker auf Ihrem Laptop oder überall dort ausgeführt werden, wo JVM-Prozesse ausgeführt werden k?nnen.

Makler konzentrieren sich auf:

  • Neue Ereignisse in Partitionen schreiben.
  • Bereitstellung von Lesevorg?ngen aus Partitionen.
  • Partitionen über Broker hinweg replizieren.

Sie führen keine Nachrichtenberechnung oder Weiterleitung von Thema zu Thema durch, wodurch ihr Design einfach und effizient bleibt.

Replikation

Kafka gew?hrleistet Datenhaltbarkeit und Fehlertoleranz durch die Replikation von Partitionsdaten über mehrere Broker hinweg. Die prim?re Kopie einer Partition ist das Leader-Replikat, w?hrend zus?tzliche Kopien Follower-Replikate sind. Die Daten werden an den Leader geschrieben, der Aktualisierungen automatisch an die Follower repliziert.

Dieser Replikationsprozess gew?hrleistet:

  • Datensicherheit, auch bei Broker- oder Speicherausf?llen.
  • Automatisches Failover, bei dem ein anderes Replikat die Rolle des Anführers übernimmt, wenn der aktuelle Anführer ausf?llt.

Entwickler profitieren von diesen Garantien, ohne die Replikation direkt verwalten zu müssen, da Kafka sie transparent abwickelt.

Produzenten

Ein Kafka-Produzent ist eine Client-Anwendung, die Daten an Kafka-Themen sendet (ver?ffentlicht). Es ist für die Erstellung und übermittlung von Nachrichten (Aufzeichnungen) an den Kafka-Cluster verantwortlich. Produzenten bestimmen das Thema und die Partition, in der Nachrichten basierend auf ihrer Konfiguration und dem Vorhandensein eines Nachrichtenschlüssels gespeichert werden. Die Hersteller sind verantwortlich für, aber nicht beschr?nkt auf:

  • Nachrichtenaufbau:
    • Jede Nachricht besteht aus einem Schlüssel (optional), einem Wert (den tats?chlichen Daten) und Metadaten.
    • Der Schlüssel bestimmt die Partition für die Nachricht und stellt so die Reihenfolge für Nachrichten mit demselben Schlüssel sicher.
  • Partitionszuweisung:
    • Wenn ein Schlüssel bereitgestellt wird, verwendet der Hersteller einen Hashing-Algorithmus, um die Partition zu bestimmen.
    • Ohne Schlüssel werden Nachrichten zur Lastverteilung im Round-Robin-Verfahren auf Partitionen verteilt.
  • Komprimierung:

    Produzenten k?nnen Nachrichten komprimieren, um die Netzwerkbandbreite und den Speicherverbrauch zu reduzieren.

Verbraucher

Ein Kafka-Consumer ist eine Client-Anwendung, die Nachrichten aus Kafka-Themen liest. Sie ruft Nachrichten in ihrem eigenen Tempo von Kafka-Partitionen ab und erm?glicht so eine Echtzeit- oder Stapelverarbeitung von Daten . Beachten Sie, dass Kafka keine Nachrichten an Verbraucher sendet, sondern Nachrichten von Kafka-Partitionen abruft, indem es die Daten anfordert.

Verbraucher behalten auch den überblick über die Verrechnungen, die sie verarbeitet haben. Offsets k?nnen automatisch oder manuell festgeschrieben werden, um sicherzustellen, dass keine Daten verloren gehen, wenn ein Verbraucher ausf?llt. Dies erm?glicht eine flexible Nutzung, einschlie?lich der Wiedergabe von Nachrichten durch Zurücksetzen des Offsets.

Verbrauchergruppen

Eine Verbrauchergruppe ist eine Gruppe von Verbrauchern, die zusammenarbeiten, um Daten aus bestimmten Themen zu konsumieren, was eine verteilte Verarbeitung der Nachrichten eines Themas erm?glicht.

Partitionen eines Themas werden auf die Verbraucher in der Gruppe aufgeteilt, um sicherzustellen, dass jede Nachricht nur von einem Verbraucher innerhalb der Gruppe verarbeitet wird. Mehrere Verbrauchergruppen k?nnen unabh?ngig voneinander und ohne Interferenzen dasselbe Thema konsumieren.

Wenn ein neuer Verbraucher einer Gruppe beitritt oder ein bestehender Verbraucher ausf?llt, weist Kafka Partitionen unter den Verbrauchern in der Gruppe neu zu, um sicherzustellen, dass alle Partitionen abgedeckt sind.

Serialisierung und Deserialisierung

Bei der Serialisierung und Deserialisierung in Kafka geht es um die Konvertierung von Daten zwischen ihrem Originalformat und einem Byte-Array zur übertragung und Speicherung, sodass Produzenten und Verbraucher effizient kommunizieren k?nnen.

Serialisierung

Ist der Prozess der Konvertierung eines Objekts oder einer Datenstruktur in einen Bytestrom, damit dieser übertragen oder gespeichert werden kann. Bevor ein Produzent Daten an ein Kafka-Thema sendet, serialisiert er die Daten (Schlüssel und Wert) in Byte-Arrays.

Gemeinsame Serialisierungsformate:

  • JSON: Für Menschen lesbar, weitgehend kompatibel.
  • Avro: Kompakt und effizient, schemabasiert.
  • Protobuf: Kompakt, schemabasiert und sprachunabh?ngig.
  • String: Einfache textbasierte Serialisierung.
  • Benutzerdefinierte Serialisierung: Für anwendungsspezifische Anforderungen.

Deserialisierung

Ist der umgekehrte Prozess, bei dem ein Bytestrom wieder in sein ursprüngliches Objekt oder seine ursprüngliche Datenstruktur konvertiert wird. Wenn ein Verbraucher Daten aus einem Kafka-Thema liest, deserialisiert er das Byte-Array wieder in ein für die Verarbeitung verwendbares Format.

Kompression

Komprimierung reduziert die Gr??e von Nachrichten, bevor sie gespeichert oder übertragen werden. Es optimiert die Speichernutzung, reduziert den Netzwerkbandbreitenverbrauch und verbessert die Gesamtleistung durch das Senden kleinerer Nutzlasten zwischen Produzenten, Brokern und Verbrauchern.

Wenn ein Produzent Nachrichten zu einem Kafka-Thema sendet, kann er die Nachricht vor der übertragung komprimieren. Die komprimierte Nachricht wird unver?ndert auf Brokern gespeichert und von Verbrauchern dekomprimiert, wenn sie die Nachrichten lesen.

Vorteile

  • Reduzierte Netzwerkbandbreite:Kleinere Nutzlasten bedeuten, dass weniger Daten über das Netzwerk übertragen werden.
  • Geringerer Speicherbedarf:Komprimierte Nachrichten beanspruchen weniger Speicherplatz auf der Festplatte.
  • Verbesserter Durchsatz:Kleinere Nachrichten erm?glichen eine schnellere Datenübertragung und -verarbeitung.

Wann verwenden?

  • Anwendungsf?lle mit gro?en Nachrichtengr??en: Durch die Komprimierung wird die Datengr??e erheblich reduziert.
  • Systeme mit hohem Durchsatz: Reduziert die Belastung der Netzwerk- und Speicherressourcen.
  • Batching: Die Komprimierung funktioniert am besten, wenn Ersteller mehrere Nachrichten stapelweise zusammenfassen.

W?hrend die Komprimierung Ressourcen spart, ist es wichtig, den Kompromiss zwischen CPU-Auslastung und Komprimierungsvorteilen auszugleichen und den Codec auszuw?hlen, der zu Ihrem Anwendungsfall passt.

Unterstützte Komprimierungstypen

  • Keine: Keine Komprimierung (Standard).
  • Gzip:Hohes Komprimierungsverh?ltnis, aber h?here CPU-Auslastung.
  • Schnell: Ausgewogene Komprimierungsgeschwindigkeit und CPU-Auslastung, geeignet für Echtzeit-Anwendungsf?lle.
  • LZ4:Schnellere Komprimierung und Dekomprimierung, optimiert für Systeme mit geringer Latenz.
  • Zstd: Hohe Komprimierungsrate mit besserer Leistung als Gzip, unterstützt in neueren Kafka-Versionen.

Abstimmung

Zur Optimierung der Leistung von Apache Kafka geh?rt die Feinabstimmung verschiedener Komponenten, um Durchsatz und Latenz effektiv auszubalancieren. Dieser Artikel kratzt nur an der Oberfl?che dieses Themas. Hier sind einige Aspekte, die beim Tuning von Kafka berücksichtigt werden sollten:

  • Partitionsverwaltung:

    • Partitionsanzahl: Erh?hen Sie die Anzahl der Partitionen, um Parallelit?t und Durchsatz zu verbessern. Vermeiden Sie jedoch überm??ig viele Partitionen, um Verwaltungsaufwand zu vermeiden. Passen Sie die Anzahl der Partitionen an Ihre Verbraucherkapazit?ten und die gewünschte Verbrauchsrate an.
  • Produzentenkonfiguration:

    • Batching: Konfigurieren Sie batch.size und linger.ms, um eine effiziente Stapelverarbeitung von Nachrichten zu erm?glichen, die Anzahl der Anfragen zu reduzieren und den Durchsatz zu verbessern.
    • Komprimierung: Implementieren Sie eine Komprimierung (z. B. compress.type=snappy), um die Nachrichtengr??e zu verringern und so die Netzwerk- und Speichernutzung zu reduzieren. Bedenken Sie den zus?tzlichen CPU-Overhead, der durch die Komprimierung entsteht.
  • Verbraucherkonfiguration:

    • Abrufeinstellungen: Passen Sie fetch.min.bytes und fetch.max.wait.ms an, um zu steuern, wie Verbraucher Nachrichten abrufen, und gleichen Sie Latenz und Durchsatz entsprechend den Anforderungen Ihrer Anwendung aus.

Praxisbeispiel

Stellen Sie sich eine Anwendung vor, die die Temperatur in einem Raum aufzeichnet und diese Daten mithilfe von Kafka übermittelt, wo eine andere Anwendung sie verarbeitet. Der Einfachheit halber konzentrieren wir uns ausschlie?lich auf den Kafka-Aspekt, wobei sowohl der Produzent als auch der Verbraucher in derselben Anwendung implementiert werden. In diesem Szenario stellt jede aufgezeichnete Temperatur zu einem bestimmten Zeitpunkt ein Ereignis dar:

{
  temperature: 42,
  timeStamp: new Date(),
};

Der gesamte Code befindet sich in diesem Repository.

Zuerst brauchen wir einen Kafka-Broker, aber anstatt Kafka auf unserem Computer zu installieren, verwenden wir einfach dieses Docker-Kafka-Image.

Beginnen Sie mit dem Ziehen dieses Bildes:

Docker Pull Apache/Kafka

Dann führen Sie es aus und ordnen den Port zu, den Kafka auf demselben Port auf unserem Computer abh?rt:

docker run -d -p 9092:9092 --name Broker Apache/kafka:latest

Das ist es, wir haben einen Kafka-Broker im Einsatz. Bevor Sie fortfahren, m?chten Sie vielleicht damit herumspielen, indem Sie Themen erstellen, Nachrichten senden und konsumieren. Befolgen Sie dazu einfach die Anweisungen auf dieser Bildseite.

Um unsere Anwendung zu erstellen, verwenden wir NestJS mit KafkaJS. Beginnen Sie mit der Erstellung der App mit Nest CLI

Nest neues My-Nest-Projekt

Im Projektordner installieren Sie kafkajs

npm i kafkajs

Und generieren Sie die folgenden Module

nest g mo kafka

Nest G MO Producer

Nest G Mo Consumer

Nest-G-Mo-Temperatur

Das Kafka-Modul übernimmt alle Kafka-spezifischen Vorg?nge, einschlie?lich der Verwaltung von Verbraucher- und Produzentenklassen zum Verbinden, Trennen, Senden und Empfangen von Nachrichten. Dies wird das einzige Modul sein, das direkt mit dem kafkajs-Paket interagiert.

Die Produzenten- und Verbrauchermodule fungieren als Schnittstellen zwischen der Pub-Sub-Plattform (in diesem Fall Kafka) und dem Rest der Anwendung und abstrahieren plattformspezifische Details.

Das Temperaturmodul verwaltet die Ereignisse. Es muss nicht wissen, welche Pub-Sub-Plattform verwendet wird, es sind lediglich ein Verbraucher und ein Produzent erforderlich, um zu funktionieren.

Nachdem wir die Module erstellt haben, erstellen wir auch einen Ordner src/interface und fügen darin die folgenden Schnittstellen hinzu:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

Fügen Sie im Ordner src/kafka/ die Producer- und Consumer-Klassen hinzu, die diese Schnittstellen implementieren:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

Vergessen Sie nicht, diese Klassen in kafka.module.ts zu exportieren

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

So wie es jetzt ist, k?nnten wir zum Temperaturmodul gehen und diese Kafka-Klassen instanziieren und beginnen, sie zu verwenden. Allerdings w?re es besser, wenn sich das Temperaturmodul keine Gedanken darüber machen müsste, welche Pub-Sub-Plattform es verwendet. Stattdessen sollte es einfach mit einem injizierten Produzenten und/oder Konsumenten zusammenarbeiten und sich ausschlie?lich auf das Senden und Empfangen von Nachrichten konzentrieren, unabh?ngig von der zugrunde liegenden Plattform. Wenn wir uns in Zukunft dazu entschlie?en, auf eine andere Pub-Sub-Plattform zu wechseln, müssen wir auf diese Weise keine ?nderungen am Temperaturmodul vornehmen.

Um diese Abstraktion zu erreichen, k?nnen wir Producer- und Consumer-Klassen erstellen, die die Besonderheiten der Producer- und Consumer-Implementierungen von Kafka verarbeiten. Beginnen wir mit dem Produzenten:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

Nun der Verbraucher:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

Jetzt k?nnen wir uns auf den Aufbau des Temperaturmoduls konzentrieren. In der Datei ?temperature.service.ts“ erstellen wir eine Methode zum Registrieren einer Temperatur, die in diesem Beispiel einfach die Temperaturdaten über einen Produzenten an den Broker sendet. Darüber hinaus implementieren wir zu Demonstrationszwecken eine Methode zur Verarbeitung eingehender Nachrichten.

Diese Methoden k?nnen von einem anderen Dienst oder einem Controller aufgerufen werden. Der Einfachheit halber rufen wir sie in diesem Beispiel jedoch direkt beim Start der Anwendung auf, indem wir die Methode onModuleInit verwenden.

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

Das ist es! Wenn der Broker im Docker-Container ausgeführt wird, k?nnen Sie die Anwendung zum Senden und Empfangen von Nachrichten starten. Darüber hinaus k?nnen Sie mit dem folgenden Befehl eine Shell im Broker-Container ?ffnen:

Docker Exec --workdir /opt/kafka/bin/ -it Broker Sh

Von dort aus k?nnen Sie direkt mit dem Broker interagieren und Nachrichten an die Anwendung senden, Nachrichten von ihr empfangen, neue Themen erstellen usw.

Dies ist das Repository mit dem Code dieses Beispiels.

Das obige ist der detaillierte Inhalt vonKafka-Grundlagen mit einem praktischen Beispiel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erkl?rung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Hei?e KI -Werkzeuge

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Clothoff.io

Clothoff.io

KI-Kleiderentferner

Video Face Swap

Video Face Swap

Tauschen Sie Gesichter in jedem Video mühelos mit unserem v?llig kostenlosen KI-Gesichtstausch-Tool aus!

Hei?e Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Java vs. JavaScript: Die Verwirrung beseitigen Java vs. JavaScript: Die Verwirrung beseitigen Jun 20, 2025 am 12:27 AM

Java und JavaScript sind unterschiedliche Programmiersprachen, die jeweils für verschiedene Anwendungsszenarien geeignet sind. Java wird für die Entwicklung gro?er Unternehmen und mobiler Anwendungen verwendet, w?hrend JavaScript haupts?chlich für die Entwicklung von Webseiten verwendet wird.

Mastering JavaScript -Kommentare: Ein umfassender Leitfaden Mastering JavaScript -Kommentare: Ein umfassender Leitfaden Jun 14, 2025 am 12:11 AM

CommentareAtecrucialinjavaScriptFormaintainingClarity und FosteringCollaboration.1) thehelpindebugging, Onboarding, und die Verst?rkung vonCodeevolution.2) Verwendungsle-Linien- und Verst??en für FosterquickexPlanations und Multi-LinecomMentSfordsetaileddescriptions.3) Bestpraktiziert

JavaScript -Kommentare: Kurzer Erl?uterung JavaScript -Kommentare: Kurzer Erl?uterung Jun 19, 2025 am 12:40 AM

JavaScriptComents AreseessentialFormaintaining, Lesen und GuidingCodeexexecution.1) einzelne Linecommments Arequickickexplanationen.2) Multi-LindexplainComproxlogicorProvedetailedDocumentation.3) InlinecommentsclarifyspecificPartsosensofCode.BestPracticic

JavaScript -Datentypen: Ein tiefer Tauchgang JavaScript -Datentypen: Ein tiefer Tauchgang Jun 13, 2025 am 12:10 AM

JavascripThasseveralprimitedatatypes: Zahl, String, Boolean, undefined, Null, Symbol und Bigint und Non-Primitivetypes LikeObjectandArray.Ververst?ndnisTheSeisScricialForwritingEffiction, Bug-Freecode: 1) numberusesa64-Bitformat, führend, führend, führend, pointofointofloatieren-pointoinssli

JavaScript vs. Java: Ein umfassender Vergleich für Entwickler JavaScript vs. Java: Ein umfassender Vergleich für Entwickler Jun 20, 2025 am 12:21 AM

JavaScriptispreferredforwebdevelopment,whileJavaisbetterforlarge-scalebackendsystemsandAndroidapps.1)JavaScriptexcelsincreatinginteractivewebexperienceswithitsdynamicnatureandDOMmanipulation.2)Javaoffersstrongtypingandobject-orientedfeatures,idealfor

Wie arbeite man mit Daten und Zeiten in JS? Wie arbeite man mit Daten und Zeiten in JS? Jul 01, 2025 am 01:27 AM

Die folgenden Punkte sollten bei der Verarbeitung von Daten und Zeiten in JavaScript festgestellt werden: 1. Es gibt viele M?glichkeiten, Datumsobjekte zu erstellen. Es wird empfohlen, ISO -Format -Zeichenfolgen zu verwenden, um die Kompatibilit?t sicherzustellen. 2. Die Zeitinformationen erhalten und festlegen k?nnen und setzen Sie Methoden fest, und beachten Sie, dass der Monat mit 0 beginnt. 3. Die manuell formatierende Daten sind Zeichenfolgen erforderlich, und auch Bibliotheken von Drittanbietern k?nnen verwendet werden. 4. Es wird empfohlen, Bibliotheken zu verwenden, die Zeitzonen wie Luxon unterstützen. Das Beherrschen dieser wichtigen Punkte kann h?ufige Fehler effektiv vermeiden.

JavaScript: Datentypen zur effizienten Codierung untersuchen JavaScript: Datentypen zur effizienten Codierung untersuchen Jun 20, 2025 am 12:46 AM

JavaScripthassevenfundamentaldatatypes:number,string,boolean,undefined,null,object,andsymbol.1)Numbersuseadouble-precisionformat,usefulforwidevaluerangesbutbecautiouswithfloating-pointarithmetic.2)Stringsareimmutable,useefficientconcatenationmethodsf

Warum sollten Sie  Tags am Ende des  platzieren? Warum sollten Sie Tags am Ende des platzieren? Jul 02, 2025 am 01:22 AM

PlatztagsattheBottomofabogpostorwebpageServeSpracticalPurposesforseo, Usexperience und design.1ithelpswithseobyallowingEnginestoaccessKeyword-relevantTagswithoutClutteringHemainContent.2.

See all articles