国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

Kafka の基礎(chǔ)と実際の例

Dec 28, 2024 am 09:26 AM

ここ數(shù)週間、私は Kafka を詳しく調(diào)べ、途中でメモを取ってきました。それを整理してブログ投稿として構(gòu)成することにしました。概念やヒントとは別に、Kafka で構(gòu)築された実踐的な例があります。 NestJS と KafkaJs。

カフカとは何ですか?

Apache Kafka は、リアルタイム イベントを処理するように設(shè)計(jì)された分散イベント ストリーミング プラットフォームです。大規(guī)模で高スループット、低遅延のデータ ストリームの保存、処理、取得が可能となり、リアルタイム データ パイプラインやイベント駆動(dòng)型アプリケーションの構(gòu)築に適しています。

主な特徴:

  • イベント ストリーミング: Kafka は、データを トピック に整理します。
  • トピックは、イベントの順序付けされたログです。
  • 分散アーキテクチャ: Kafka は、スケーラビリティと耐障害性を考慮して構(gòu)築されています。ブローカーと呼ばれるノードのクラスター
  • として動(dòng)作し、複數(shù)のサーバーにデータを分散できます。
  • パブリッシュ-サブスクライブ モデル: プロデューサートピックにメッセージを書(shū)き込み、コンシューマー
  • はトピックからメッセージを読み取ります。 Kafka は複數(shù)のコンシューマをサポートし、異なるアプリケーションが同じデータ ストリームを獨(dú)立して処理できるようにします。
  • 高パフォーマンス:
  • Kafka は高スループット向けに最適化されており、1 秒あたり數(shù)百萬(wàn)のメッセージを低遅延で処理します。
  • 永続ストレージ:
  • Kafka は、構(gòu)成可能な保持期間でメッセージをディスクに保存し、データの永続性と信頼性を確保します。
  • パーティショニングとレプリケーション: トピックはスケーラビリティのために パーティション
  • に分割され、フォールト トレランスのためにブローカー間でレプリケートされます。
  • 再生可能性: コンシューマは、オフセット
  • をリセットすることでメッセージを再読み取りでき、データの再処理または回復(fù)が可能になります。
  • 統(tǒng)合とエコシステム:
  • Kafka はさまざまなシステムと統(tǒng)合し、Kafka Connect (データ統(tǒng)合用) や Kafka Streams (ストリーム処理用) などのツールを備えています。

利點(diǎn)
  • 信頼性:
  • データの分散、レプリケーション、パーティション化を通じてフォールト トレランスを保証します。
  • スケーラビリティ:
  • Kafka は大量のデータを処理し、ダウンタイムなしで水平方向に拡張できます。
  • 耐久性:
  • メッセージは即座に保存され、回復(fù)力とデータの永続性が保証されます。
  • パフォーマンス:
  • Kafka は、極端なデータ負(fù)荷下でも高いパフォーマンスを維持し、ダウンタイムやデータ損失なしで大量のデータを処理します。

短所

これらのトレードオフは、Kafka のパフォーマンスを最大化するための意図的な設(shè)計(jì)上の選択ですが、より高い柔軟性が必要なユースケースでは課題が生じる可能性があります。
  • 柔軟性の制限: Kafka には、レポート內(nèi)の特定のデータのフィルタリングなどの拡張クエリのサポートがありません。 Kafka は受信した順序でオフセットによってメッセージを取得するため、コンシューマはこれらのタスクを処理する必要があります。
  • 長(zhǎng)期保存用に設(shè)計(jì)されていない: Kafka はデータのストリーミングには優(yōu)れていますが、履歴データを長(zhǎng)期間保存するのには適していません。データの重複により、大規(guī)模なデータセットのストレージのコストが高くなる可能性があります。
  • ワイルドカード トピックのサポートなし: Kafka では、ワイルドカード パターン (log-2024-* など) を使用して複數(shù)のトピックから消費(fèi)することはできません。

ユースケース

  • リアルタイム分析: データ ストリームが発生するたびに処理および分析します。
  • イベント ソーシング: アプリケーションの狀態(tài)に対するすべての変更を一連のイベントとして記録します。
  • ログ集約: 分散システムからログを収集および管理します。
  • データ パイプライン: システム間でデータを確実かつ効率的にストリーミングします。
  • IoT アプリケーション: IoT デバイスからのセンサー データをリアルタイムで処理します。

カフカはどのように機(jī)能しますか?

Kafka は、キューイング メッセージング モデルとパブリッシュ/サブスクライブ メッセージング モデルの両方の機(jī)能を統(tǒng)合し、消費(fèi)者にそれぞれのアプローチの利點(diǎn)を提供します。

  • キューは、複數(shù)のコンシューマ インスタンスにタスクを分散することでスケーラブルなデータ処理を可能にしますが、従來(lái)のキューは複數(shù)のサブスクライバをサポートしません。
  • パブリッシュ-サブスクライブ モデルは複數(shù)のサブスクライバーをサポートしますが、各メッセージがすべてのサブスクライバーに送信されるため、複數(shù)のワーカー プロセスにタスクを分散することはできません。

Kafka はパーティション化されたログ システムを採(cǎi)用して、キューイングとパブリッシュ/サブスクライブ モデルの利點(diǎn)を組み合わせています。順序付けられた一連のレコードであるログはパーティションに分割され、各パーティションが異なるサブスクライバ (コンシューマ) に割り當(dāng)てられます。この設(shè)定により、複數(shù)のサブスクライバがスケーラビリティを維持しながらトピックを共有できるようになります。

Kafka fundamentals with a practical example

イベント、トピック、パーティション

Kafka がリアルタイム イベントを処理するように設(shè)計(jì)されたプラットフォームであることは見(jiàn)てきましたが、これらのイベントがどのように処理されるかを説明する前に、それらのイベントを定義する必要があります。

イベントとは、支払い、Web サイトのクリック、溫度測(cè)定など、アプリケーションに記録されたアクション、インシデント、または変更です。

Kafka の

イベント は、キー/値 ペアとしてモデル化され、キーと値の両方がバイト シーケンスにシリアル化されます。

  • は、多くの場(chǎng)合、シリアル化されたドメイン オブジェクト、またはセンサー出力やその他のアプリケーション データなどの生の入力を表します。これらは、Kafka イベントで送信されるコア情報(bào)をカプセル化します。
  • キー は複雑なドメイン オブジェクトにすることもできますが、多くの場(chǎng)合、文字列や整數(shù)などの単純なタイプになります。 (リレーショナル データベースの主キーのように) 個(gè)々のイベントを一意に識(shí)別するのではなく、通常、キーはシステム內(nèi)のエンティティ (特定のユーザー、注文、接続されたデバイスなど) を識(shí)別します。

Kafka は、イベントを トピック と呼ばれる 順序付けられたログ に整理します。外部システムが Kafka にイベントを書(shū)き込むと、イベントはトピックの末尾に追加されます。メッセージは、読み取られた後でも、設(shè)定可能な期間トピック內(nèi)に殘ります。キューとは異なり、トピックは耐久性、複製性、耐障害性に優(yōu)れており、イベント レコードを効率的に保存します。ただし、ログは順次スキャンのみ可能であり、クエリは実行できません。

トピックはログ ファイルとしてディスクに保存されますが、ディスクには有限のサイズや I/O などの制限があります。これを克服するために、Kafka ではトピックを パーティション に分割し、単一のログを複數(shù)のログに分割し、異なるサーバーに分散できるようにします。このパーティショニングにより、Kafka は水平方向に拡張できるようになり、大量のイベントと高スループットを処理する能力が強(qiáng)化されます。

Kafka は、キー:

があるかどうかに基づいてメッセージをパーティションに割り當(dāng)てます。
  • キーなし: メッセージはすべてのパーティションにラウンドロビンで分散され、均等なデータ分散が保証されますが、メッセージの順序は維持されません。
  • キーあり: パーティションはキーのハッシュによって決定され、同じキーを持つメッセージが常に同じパーティションに送られ、順序が維持されます。

ブローカー

Kafka は、集合的に Kafka クラスターを形成する ブローカー と呼ばれるノードを使用する分散データ インフラストラクチャとして動(dòng)作します。ブローカーは、ベアメタル ハードウェア、クラウド インスタンス、Kubernetes によって管理されるコンテナー、ラップトップ上の Docker、または JVM プロセスが実行できる場(chǎng)所ならどこでも実行できます。

ブローカーは以下に焦點(diǎn)を當(dāng)てます:

  • 新しいイベントをパーティションに書(shū)き込みます。
  • パーティションからの読み取りを提供します。
  • ブローカー間でパーティションをレプリケートします。

メッセージの計(jì)算やトピック間のルーティングを?qū)g行せず、設(shè)計(jì)をシンプルかつ効率的に保ちます。

レプリケーション

Kafka は、複數(shù)のブローカー間でパーティション データをレプリケートすることにより、データの耐久性とフォールト トレランスを保証します。パーティションのプライマリ コピーは リーダー レプリカ であり、追加のコピーは フォロワー レプリカ です。データはリーダーに書(shū)き込まれ、更新內(nèi)容が自動(dòng)的にフォロワーにレプリケートされます。

このレプリケーション プロセスにより、次のことが保証されます。

  • ブローカーやストレージに障害が発生した場(chǎng)合でもデータを安全に保ちます。
  • 自動(dòng)フェイルオーバー?,F(xiàn)在のリーダーに障害が発生した場(chǎng)合、別のレプリカがリーダーとして引き継ぎます。

Kafka はレプリケーションを透過(guò)的に処理するため、開(kāi)発者はレプリケーションを直接管理する必要がなく、これらの保証の恩恵を受けられます。

プロデューサー

Kafka プロデューサー は、データを Kafka トピック に送信 (パブリッシュ) するクライアント アプリケーションです。メッセージ (レコード) を作成し、Kafka クラスター に送信する役割を果たします。プロデューサーは、構(gòu)成とメッセージ キーの存在に基づいて、メッセージが保存される トピックパーティション を決定します。プロデューサーは次のことに責(zé)任を負(fù)いますが、これに限定されません:

  • メッセージの構(gòu)成:
    • 各メッセージは、キー (オプション)、値 (実際のデータ)、およびメタデータで構(gòu)成されます。
    • キーはメッセージのパーティションを決定し、同じキーを持つメッセージの順序を保証します。
  • パーティションの割り當(dāng)て:
    • キーが指定されている場(chǎng)合、プロデューサーはハッシュ アルゴリズムを使用してパーティションを決定します。
    • キーを使用しない場(chǎng)合、メッセージは負(fù)荷分散のためにラウンドロビン方式でパーティション全體に分散されます。
  • 圧縮:

    プロデューサーはメッセージを圧縮して、ネットワーク帯域幅とストレージの使用量を削減できます。

消費(fèi)者

Kafka コンシューマー は、Kafka トピック からメッセージを読み取るクライアント アプリケーションです。

は獨(dú)自のペースで Kafka パーティションからメッセージを取得し、データのリアルタイムまたはバッチ処理を可能にします。 。 Kafka はコンシューマーにメッセージをプッシュするのではなく、データをリクエストすることで Kafka パーティションからメッセージをプルすることに注意してください。

消費(fèi)者は、処理したオフセットも追跡します。オフセットは自動(dòng)的にまたは手動(dòng)

でコミットでき、コンシューマーに障害が発生してもデータが失われないようにします。これにより、オフセットをリセットしてメッセージを再生するなど、柔軟な利用が可能になります。

消費(fèi)者団體

コンシューマ グループは、一部のトピックからのデータを協(xié)力して消費(fèi)するコンシューマのセットであり、これによりトピックのメッセージの分散処理が可能になります。

トピックのパーティションはグループ內(nèi)のコンシューマー間で分割され、各メッセージがグループ內(nèi)の 1 つのコンシューマーのみによって処理されるようにします。複數(shù)の消費(fèi)者グループは、干渉することなく同じトピックを獨(dú)立して利用できます。

新しいコンシューマがグループに參加するか、既存のコンシューマが失敗すると、Kafka はグループ內(nèi)のコンシューマ間でパーティションを再割り當(dāng)てし、すべてのパーティションが確実にカバーされるようにします。

シリアル化と逆シリアル化

Kafka のシリアル化と逆シリアル化は、送信および保存のために元の形式とバイト配列の間でデータを変換し、プロデューサーとコンシューマーが効率的に通信できるようにすることです。

連載

オブジェクトまたはデータ構(gòu)造をバイト ストリームに変換して、送信または保存できるようにするプロセスです。プロデューサは、Kafka トピックにデータを送信する前に、データ (キーと値) をバイト配列にシリアル化します。

一般的なシリアル化形式:

  • JSON: 人間が判読可能で、幅広い互換性があります。
  • Avro: コンパクトで効率的な、スキーマベース。
  • Protobuf: コンパクト、スキーマベース、言語(yǔ)に依存しない。
  • String: 単純なテキストベースのシリアル化。
  • カスタムシリアル化: アプリケーション固有のニーズ用。

逆シリアル化

逆のプロセスで、バイト ストリームが元のオブジェクトまたはデータ構(gòu)造に変換されます。コンシューマーが Kafka トピックからデータを読み取るとき、バイト配列をデシリアライズして処理に使用可能な形式に戻します。

圧縮

圧縮とは、メッセージを保存または送信する前にメッセージのサイズを減らすことです。プロデューサー、ブローカー、コンシューマー間でより小さなペイロードを送信することで、ストレージの使用量を最適化し、ネットワーク帯域幅の消費(fèi)を削減し、全體的なパフォーマンスを向上させます。

プロデューサーが Kafka トピックにメッセージを送信するとき、送信前にメッセージを圧縮できます。圧縮されたメッセージはブローカーにそのまま保存され、コンシューマがメッセージを読み取るときに解凍されます。

メリット

  • ネットワーク帯域幅の削減: ペイロードが小さくなると、ネットワーク上で送信されるデータが少なくなります。
  • ストレージ要件の低下: 圧縮されたメッセージは、ディスク上の占有スペースを減らします。
  • スループットの向上: メッセージが小さいため、より高速なデータ転送と処理が可能になります。

いつ使用しますか?

  • メッセージ サイズが大きいユースケース: 圧縮によりデータ サイズが大幅に削減されます。
  • 高スループット システム: ネットワークとストレージ リソースの負(fù)擔(dān)を軽減します。
  • バッチ処理: 圧縮は、プロデューサーが複數(shù)のメッセージをまとめてバッチ処理する場(chǎng)合に最適に機(jī)能します。

圧縮によりリソースが節(jié)約されますが、CPU 使用率と圧縮の利點(diǎn)との間のトレードオフのバランスをとり、ユースケースに合ったコーデックを選択することが重要です。

サポートされている圧縮タイプ

  • なし: 圧縮なし (デフォルト)。
  • Gzip: 圧縮率は高いですが、CPU 使用率が高くなります。
  • Snappy: 圧縮速度と CPU 使用率のバランスが取れており、リアルタイムのユースケースに適しています。
  • LZ4: 圧縮と解凍が高速化され、低遅延システム向けに最適化されています。
  • Zstd: Gzip よりも優(yōu)れたパフォーマンスを備えた高い圧縮率。新しい Kafka バージョンでサポートされています。

チューニング

Apache Kafka のパフォーマンスを最適化するには、さまざまなコンポーネントを微調(diào)整してスループットとレイテンシのバランスを効果的に調(diào)整する必要があります。この記事はこの主題の表面をなぞっただけです。Kafka をチューニングする際に考慮すべきいくつかの側(cè)面を以下に示します。

  • パーティション管理:

    • パーティション數(shù): パーティションの數(shù)を増やすと、並列処理とスループットが向上します。ただし、管理オーバーヘッドを防ぐために、過(guò)度のパーティションは避けてください。パーティションの數(shù)は、消費(fèi)者の能力と希望する消費(fèi)率に合わせて調(diào)整してください。
  • プロデューサー構(gòu)成:

    • バッチ処理: メッセージの効率的なバッチ処理を有効にして、リクエストの數(shù)を減らし、スループットを向上させるために、batch.size と linger.ms を構(gòu)成します。
    • 圧縮: 圧縮 (compression.type=snappy など) を?qū)g裝してメッセージ サイズを減らし、ネットワークとストレージの使用量を削減します。圧縮によって追加の CPU オーバーヘッドが発生することに注意してください。
  • コンシューマ構(gòu)成:

    • フェッチ設(shè)定: fetch.min.bytes と fetch.max.wait.ms を調(diào)整して、コンシューマがメッセージを取得する方法を制御し、アプリケーションのニーズに応じてレイテンシーとスループットのバランスをとります。

実踐例

部屋の溫度を記録し、このデータを Kafka を使用して送信し、別のアプリケーションがそれを処理するアプリケーションを想像してください。わかりやすくするために、プロデューサーとコンシューマーの両方が同じアプリケーション內(nèi)に実裝されている Kafka の側(cè)面にのみ焦點(diǎn)を當(dāng)てます。このシナリオでは、特定の瞬間に記録された各溫度がイベントを表します:

{
  temperature: 42,
  timeStamp: new Date(),
};

すべてのコードはこのリポジトリにあります。

まず、Kafka ブローカーが必要ですが、マシンに Kafka をインストールする代わりに、この Docker Kafka イメージだけをインストールしましょう。

その畫(huà)像をプルすることから始めます:

docker pull apache/kafka

次に、マシン上の同じポートで Kafka がリッスンするポートをマッピングして実行します。

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

以上です。Kafka ブローカーが実行されています。続行する前に、トピックを作成し、メッセージを送信して消費(fèi)することで、Kafka ブローカーを試してみることができます。そのためには、そのイメージ ページの指示に従ってください。

アプリケーションを構(gòu)築するには、NestJS と KafkaJS を使用します。まず、Nest CLI でアプリを作成します

ネスト新しい my-nest-プロジェクト

プロジェクトフォルダー內(nèi)にkafkajsをインストールします

npm i kafkajs

そして次のモジュールを生成します

ネスト?ジー?モ?カフカ

ネストGモプロデューサー

nest g mo コンシューマー

巣鴨溫度

Kafka モジュール は、メッセージの接続、切斷、送信、受信のためのコンシューマー クラスとプロデューサー クラスの管理を含む、Kafka 固有のすべての操作を処理します。これは、kafkajs パッケージと直接対話(huà)する唯一のモジュールになります。

プロデューサー モジュールとコンシューマー モジュールは、パブリッシュ/サブスクライブ プラットフォーム (この場(chǎng)合は Kafka) とアプリケーションの殘りの部分の間のインターフェイスとして機(jī)能し、プラットフォーム固有の詳細(xì)を抽象化します。

溫度モジュールがイベントを管理します。どのパブリッシュ/サブスクライブ プラットフォームが使用されているかを知る必要はなく、コンシューマーとプロデューサーが機(jī)能することだけが必要です。

モジュールを作成したら、フォルダー src/interface も作成し、その中に次のインターフェースを追加しましょう:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

src/kafka/ フォルダーに、これらのインターフェイスを?qū)g裝するプロデューサー クラスとコンシューマー クラスを追加します。

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

これらのクラスを kafka.module.ts にエクスポートすることを忘れないでください

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

現(xiàn)狀では、溫度モジュールに移動(dòng)して、これらの Kafka クラスをインスタンス化し、使用を開(kāi)始できます。ただし、溫度モジュールがどのパブリッシュ/サブスクライブ プラットフォームを使用しているかを気にする必要がなければ、より良いでしょう。代わりに、基盤(pán)となるプラットフォームに関係なく、メッセージの送受信のみに焦點(diǎn)を當(dāng)て、挿入されたプロデューサーやコンシューマーと単純に連攜する必要があります。こうすることで、將來(lái)別のパブリッシュ/サブスクライブ プラットフォームに切り替えることを決定した場(chǎng)合でも、溫度モジュールに変更を加える必要がなくなります。

この抽象化を?qū)g現(xiàn)するには、Kafka のプロデューサーおよびコンシューマー実裝の詳細(xì)を処理するプロデューサー クラスとコンシューマー クラスを作成できます。プロデューサーから始めましょう:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

さて、消費(fèi)者は:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

ここで、溫度モジュールの構(gòu)築に集中できます。溫度.service.ts ファイルで、溫度を登録するメソッドを作成します。この例では、プロデューサーを使用して溫度データをブローカーに送信するだけです。さらに、デモ目的で受信メッセージを処理するメソッドを?qū)g裝します。

これらのメソッドは、別のサービスまたはコントローラーから呼び出すことができます。ただし、簡(jiǎn)単にするために、この例では、アプリケーションの起動(dòng)時(shí)に onModuleInit メソッドを利用して直接呼び出します。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

それだけです! Docker コンテナ內(nèi)でブローカーを?qū)g行すると、アプリケーションを起動(dòng)してメッセージを送受信できます。さらに、次のコマンドを使用して、ブローカー コンテナ內(nèi)でシェルを開(kāi)くことができます:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

そこから、ブローカーと直接対話(huà)し、アプリケーションにメッセージを送信したり、アプリケーションからメッセージを受信したり、新しいトピックを作成したりすることができます。

これは、この例のコードが含まれるリポジトリです。

以上がKafka の基礎(chǔ)と実際の例の詳細(xì)內(nèi)容です。詳細(xì)については、PHP 中國(guó)語(yǔ) Web サイトの他の関連記事を參照してください。

このウェブサイトの聲明
この記事の內(nèi)容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰屬します。このサイトは、それに相當(dāng)する法的責(zé)任を負(fù)いません。盜作または侵害の疑いのあるコンテンツを見(jiàn)つけた場(chǎng)合は、admin@php.cn までご連絡(luò)ください。

ホットAIツール

Undress AI Tool

Undress AI Tool

脫衣畫(huà)像を無(wú)料で

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード寫(xiě)真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

寫(xiě)真から衣服を削除するオンライン AI ツール。

Clothoff.io

Clothoff.io

AI衣類(lèi)リムーバー

Video Face Swap

Video Face Swap

完全無(wú)料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡(jiǎn)単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無(wú)料のコードエディター

SublimeText3 中國(guó)語(yǔ)版

SublimeText3 中國(guó)語(yǔ)版

中國(guó)語(yǔ)版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強(qiáng)力な PHP 統(tǒng)合開(kāi)発環(huán)境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開(kāi)発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Java vs. JavaScript:混亂を解消します Java vs. JavaScript:混亂を解消します Jun 20, 2025 am 12:27 AM

JavaとJavaScriptは異なるプログラミング言語(yǔ)であり、それぞれ異なるアプリケーションシナリオに適しています。 Javaは大規(guī)模なエンタープライズおよびモバイルアプリケーション開(kāi)発に使用されますが、JavaScriptは主にWebページ開(kāi)発に使用されます。

JavaScriptのマスターコメント:包括的なガイド JavaScriptのマスターコメント:包括的なガイド Jun 14, 2025 am 12:11 AM

ContureCrucialInjavascript formantaining andFosteringCollaboration.1)TheypindeBugging、Onboarding、およびUnderstandingCodeevolution.2)usesingle-linecomments for quickexplanations andmulti-linecomments fordeTeTaileddespransions.3)BestPractsinclud

JavaScriptコメント:短い説明 JavaScriptコメント:短い説明 Jun 19, 2025 am 12:40 AM

JavaScriptcommentsEareEssentialential-formaining、およびGuidingCodeexecution.1)single-linecommentseared forquickexplanations.2)多LinecommentsexplaincomplexlogiCorprovidededocumentation.3)clarifyspartsofcode.bestpractic

JavaScriptデータ型:ディープダイビング JavaScriptデータ型:ディープダイビング Jun 13, 2025 am 12:10 AM

javascripthasseveralprimitivedatypes:number、string、boolean、undefined、null、symbol、andbigint、andnon-primitiveTypeslike objectandarray

JavaScript vs. Java:開(kāi)発者向けの包括的な比較 JavaScript vs. Java:開(kāi)発者向けの包括的な比較 Jun 20, 2025 am 12:21 AM

javascriptispreferredforwebdevelopment、whilejavaisbetterforlge-scalebackendsystemsandroidapps.1)javascriptexcelsininintingtivewebexperiences withitsdynAmicnature anddommanipulation.2)javaofferstruntypyping-dobject-reientedpeatures

JSで日付と時(shí)間を操作する方法は? JSで日付と時(shí)間を操作する方法は? Jul 01, 2025 am 01:27 AM

JavaScriptで日付と時(shí)間を処理する場(chǎng)合は、次の點(diǎn)に注意する必要があります。1。日付オブジェクトを作成するには多くの方法があります。 ISO形式の文字列を使用して、互換性を確保することをお?jiǎng)幛幛筏蓼埂?2。時(shí)間情報(bào)を取得および設(shè)定して、メソッドを設(shè)定でき、月は0から始まることに注意してください。 3.手動(dòng)でのフォーマット日付には文字列が必要であり、サードパーティライブラリも使用できます。 4.ルクソンなどのタイムゾーンをサポートするライブラリを使用することをお?jiǎng)幛幛筏蓼埂¥长欷椁沃匾圣荪ぅ螗趣蛄?xí)得すると、一般的な間違いを効果的に回避できます。

JavaScript:効率的なコーディングのためのデータ型の調(diào)査 JavaScript:効率的なコーディングのためのデータ型の調(diào)査 Jun 20, 2025 am 12:46 AM

javascripthassevenfundamentaldatypes:number、string、boolean、undefined、null、object、andsymbol.1)numberseadouble-precisionformat、有用であるため、有用性の高いものであるため、but-for-loating-pointarithmetic.2)ストリングリムムット、使用率が有用であること

なぜの下部にタグを配置する必要があるのですか? なぜの下部にタグを配置する必要があるのですか? Jul 02, 2025 am 01:22 AM

PLACSTHETTHETTHE BOTTOMOFABLOGPOSTORWEBPAGESERVESPAGESPORCICALPURPOSESESFORSEO、userexperience、andDesign.1.IthelpswithiobyAllowingseNStoAccessKeysword-relevanttagwithtagwithtagwithtagwithemaincontent.2.iTimrovesexperiencebyepingepintepepinedeeping

See all articles