国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

首頁(yè) web前端 js教程 Kafka 基礎(chǔ)知識(shí)與實(shí)際範(fàn)例

Kafka 基礎(chǔ)知識(shí)與實(shí)際範(fàn)例

Dec 28, 2024 am 09:26 AM

在過(guò)去的幾周里,我一直在深入研究Kafka 並一路做筆記,我決定組織和構(gòu)建一篇博客文章,在上面,除了概念和技巧之外,還有一個(gè)使用構(gòu)建的實(shí)際示例NestJS和KafkaJs。

卡夫卡是什麼?

Apache Kafka 是一個(gè)分散式事件流平臺(tái),旨在處理即時(shí)事件。它能夠儲(chǔ)存、處理和檢索大規(guī)模、高吞吐量、低延遲的資料流,使其適合建立即時(shí)資料管道和事件驅(qū)動(dòng)的應(yīng)用程式。

主要特點(diǎn):

  • 事件流: Kafka 將資料組織成主題,它們是事件的有序日誌。
  • 分散式架構(gòu):Kafka 是為了可擴(kuò)展性和容錯(cuò)能力而建構(gòu)的。它作為稱(chēng)為代理的節(jié)點(diǎn)叢集運(yùn)行,可以跨多個(gè)伺服器分發(fā)資料。
  • 發(fā)布-訂閱模型:生產(chǎn)者將訊息寫(xiě)入主題消費(fèi)者從中讀取訊息。 Kafka支援多個(gè)消費(fèi)者,讓不同的應(yīng)用程式獨(dú)立處理同一個(gè)資料流。
  • 高效能: Kafka 針對(duì)高吞吐量進(jìn)行了最佳化,每秒處理數(shù)百萬(wàn)則訊息,延遲較低。
  • 持久儲(chǔ)存: Kafka 將訊息儲(chǔ)存在磁碟上,保留期限可配置,確保資料持久性和可靠性。
  • 分區(qū)與複製:主題分為分區(qū)以實(shí)現(xiàn)可擴(kuò)展性,並跨代理進(jìn)行複製以實(shí)現(xiàn)容錯(cuò)。
  • 可重玩性:消費(fèi)者可以透過(guò)重置其偏移量來(lái)重新讀取訊息,從而啟用資料重新處理或復(fù)原。
  • 集成和生態(tài)系統(tǒng): Kafka 與各種系統(tǒng)集成,並擁有 Kafka Connect(用於數(shù)據(jù)集成)和 Kafka Streams(用於流處理)等工具。

優(yōu)點(diǎn)

  • 可靠性:透過(guò)資料分發(fā)、複製和分區(qū)確保容錯(cuò)。
  • 可擴(kuò)充性:Kafka 可以處理大量資料並水平擴(kuò)展而無(wú)需停機(jī)。
  • 持久性:訊息被及時(shí)存儲(chǔ),確保彈性和資料持久性。
  • 效能:Kafka 在極端資料負(fù)載下保持高效能,處理大量資料而不會(huì)造成停機(jī)或資料遺失。

缺點(diǎn)

這些權(quán)衡是有意的設(shè)計(jì)選擇,旨在最大限度地提高 Kafka 的性能,但可能會(huì)給需要更大靈活性的用例帶來(lái)挑戰(zhàn):

  • 靈活性有限: Kafka 缺乏對(duì)擴(kuò)展查詢(xún)的支持,例如過(guò)濾報(bào)告中的特定資料。消費(fèi)者必須處理這些任務(wù),因?yàn)?Kafka 按訊息接收順序的偏移量檢索訊息。
  • 不適合長(zhǎng)期儲(chǔ)存:Kafka 擅長(zhǎng)串流數(shù)據(jù),但不適合長(zhǎng)期儲(chǔ)存歷史資料。資料重複會(huì)使大型資料集的儲(chǔ)存成本高昂。
  • 無(wú)通配符主題支援: Kafka 不允許使用通配符模式(例如 log-2024-*)從多個(gè)主題進(jìn)行消費(fèi)。

使用案例

  • 即時(shí)分析:在資料流發(fā)生時(shí)進(jìn)行處理與分析。
  • 事件溯源: 將應(yīng)用程式狀態(tài)的所有變更記錄為事件序列。
  • 日誌聚合:從分散式系統(tǒng)收集和管理日誌。
  • 資料管道:可靠且有效率地在系統(tǒng)之間傳輸資料。
  • 物聯(lián)網(wǎng)應(yīng)用:即時(shí)處理來(lái)自物聯(lián)網(wǎng)設(shè)備的感測(cè)器資料。

卡夫卡如何運(yùn)作?

Kafka 整合了隊(duì)列和發(fā)布-訂閱訊息傳遞模型的功能,為消費(fèi)者提供每種方法的優(yōu)勢(shì)。

  • 佇列 透過(guò)在多個(gè)消費(fèi)者實(shí)例之間分配任務(wù)來(lái)實(shí)現(xiàn)可擴(kuò)展的資料處理,但傳統(tǒng)佇列不支援多個(gè)訂閱者。
  • 發(fā)布-訂閱模型支援多個(gè)訂閱者,但無(wú)法在多個(gè)工作進(jìn)程之間分配任務(wù),因?yàn)槊總€(gè)訊息都會(huì)發(fā)送給所有訂閱者。

Kafka 採(cǎi)用分區(qū)日誌系統(tǒng)來(lái)結(jié)合佇列和發(fā)布-訂閱模型的優(yōu)點(diǎn)。日誌是有序的記錄序列,被分成多個(gè)分區(qū),每個(gè)分區(qū)分配給不同的訂閱者(消費(fèi)者)。此設(shè)定使多個(gè)訂閱者能夠共享一個(gè)主題,同時(shí)保持可擴(kuò)展性。

Kafka fundamentals with a practical example

活動(dòng)、主題和分區(qū)

我們已經(jīng)看到 Kafka 是一個(gè)旨在處理即時(shí)事件的平臺(tái),在討論如何處理這些事件之前,我們需要對(duì)它們進(jìn)行定義:

事件是記錄應(yīng)用程式的操作、事件或更改,例如付款、網(wǎng)站點(diǎn)擊或溫度讀數(shù)。

Kafka 中的

事件被建模為鍵/值對(duì),其中鍵和值都被序列化為位元組序列。

  • 通常表示序列化的域物件或原始輸入,例如感測(cè)器輸出或其他應(yīng)用程式資料。它們封裝了 Kafka 事件中傳輸?shù)暮诵挠嵪ⅰ?
  • 可以是複雜的域?qū)ο?,但通常是?jiǎn)單的類(lèi)型,如字串或整數(shù)。鍵通常標(biāo)識(shí)系統(tǒng)內(nèi)的實(shí)體,例如特定使用者、訂單或連接的設(shè)備,而不是唯一標(biāo)識(shí)單一事件(如關(guān)聯(lián)式資料庫(kù)中的主鍵)。

Kafka 將事件組織成有序日誌,稱(chēng)為主題。當(dāng)外部系統(tǒng)將事件寫(xiě)入 Kafka 時(shí),它會(huì)被附加到主題的末端。即使在閱讀後,訊息也會(huì)在主題中保留可配置的持續(xù)時(shí)間。與佇列不同,主題具有持久性、可複製性和容錯(cuò)性,可以有效地儲(chǔ)存事件記錄。但日誌只能順序掃描,不能查詢(xún)。

主題作為日誌檔案儲(chǔ)存在磁碟上,但是磁碟具有有限的大小和 I/O 等限制。為了克服這個(gè)問(wèn)題,Kafka 允許主題分為分區(qū),將單一日誌分解為多個(gè)可以分佈在不同伺服器上的日誌。這種分區(qū)使 Kafka 能夠水平擴(kuò)展,增強(qiáng)其處理大量事件和高吞吐量的能力。

Kafka 根據(jù)分割區(qū)是否有 key:

將訊息分配給分割區(qū)
  • 無(wú)鍵:訊息在所有分割區(qū)之間循環(huán)分發(fā),確保資料均勻分佈,但不保留訊息順序。
  • With Key: 分區(qū)是透過(guò)對(duì) key 進(jìn)行哈希處理來(lái)確定的,確保具有相同 key 的訊息始終進(jìn)入相同的分區(qū)並保持其順序。

經(jīng)紀(jì)人

Kafka 使用名為 brokers 的節(jié)點(diǎn)作為分散式資料基礎(chǔ)設(shè)施運(yùn)行,這些節(jié)點(diǎn)共同形成 Kafka 叢集。代理程式可以在裸機(jī)硬體、雲(yún)端執(zhí)行個(gè)體、Kubernetes 管理的容器中、筆記型電腦上的 Docker 或任何可以執(zhí)行 JVM 程序的地方運(yùn)作。

經(jīng)紀(jì)商關(guān)注:

  • 將新事件寫(xiě)入分割區(qū)。
  • 從分割區(qū)讀取服務(wù)。
  • 跨代理複製分區(qū)。

它們不執(zhí)行訊息計(jì)算或主題到主題的路由,從而保持設(shè)計(jì)簡(jiǎn)單且有效率。

複製

Kafka 透過(guò)跨多個(gè)代理複製分區(qū)資料來(lái)確保資料的持久性和容錯(cuò)性。分區(qū)的主要副本是領(lǐng)導(dǎo)副本,而其他副本是跟隨副本。資料被寫(xiě)入領(lǐng)導(dǎo)者,領(lǐng)導(dǎo)者自動(dòng)將更新複製到追隨者。

此複製過(guò)程可確保:

  • 資料安全,即使在代理程式或儲(chǔ)存發(fā)生故障的情況下也是如此。
  • 自動(dòng)故障轉(zhuǎn)移,如果當(dāng)前領(lǐng)導(dǎo)者失敗,另一個(gè)副本將接管作為領(lǐng)導(dǎo)者。

開(kāi)發(fā)人員可以從這些保證中受益,而無(wú)需直接管理複製,因?yàn)?Kafka 會(huì)透明地處理它。

製片人

Kafka 生產(chǎn)者 是一個(gè)客戶(hù)端應(yīng)用程序,它將資料發(fā)送(發(fā)布)到 Kafka 主題。它負(fù)責(zé)建立訊息(記錄)並將其傳送到 Kafka 叢集。生產(chǎn)者根據(jù)其配置和訊息金鑰的存在來(lái)決定儲(chǔ)存訊息的主題分區(qū)。生產(chǎn)者負(fù)責(zé)但不限於:

  • 訊息撰寫(xiě):
    • 每個(gè)訊息由一個(gè)鍵(可選)、一個(gè)值(實(shí)際資料)和元資料組成。
    • key決定訊息的分區(qū),確保具有相同key的訊息的順序。
  • 分區(qū)分配:
    • 如果提供了金鑰,生產(chǎn)者將使用雜湊演算法來(lái)決定分區(qū)。
    • 沒(méi)有金鑰,訊息以循環(huán)方式跨分區(qū)分發(fā)以實(shí)現(xiàn)負(fù)載平衡。
  • 壓縮:

    生產(chǎn)者可以壓縮訊息以減少網(wǎng)路頻寬和儲(chǔ)存使用。

消費(fèi)者

Kafka 消費(fèi)者 是一個(gè)客戶(hù)端應(yīng)用程序,它從Kafka 主題讀取訊息, 它按照自己的節(jié)奏從Kafka 分區(qū)檢索訊息,允許即時(shí)或批量處理數(shù)據(jù)。請(qǐng)注意,Kafka 不會(huì)將訊息推送給消費(fèi)者,而是透過(guò)請(qǐng)求資料從 Kafka 分區(qū)中拉取訊息。

消費(fèi)者也可以追蹤他們已處理的抵銷(xiāo)額。偏移量可以自動(dòng)手動(dòng)提交,確保消費(fèi)者失敗時(shí)資料不會(huì)遺失。這允許靈活的消費(fèi),包括透過(guò)重置偏移量來(lái)重播訊息。

消費(fèi)群

消費(fèi)者組是一組消費(fèi)者,它們合作消費(fèi)來(lái)自某些主題的數(shù)據(jù),從而允許分散式處理主題的訊息。

主題的分區(qū)在群組內(nèi)的消費(fèi)者之間劃分,確保每個(gè)訊息僅由群組內(nèi)的一個(gè)消費(fèi)者處理。多個(gè)消費(fèi)組可以獨(dú)立消費(fèi)同一個(gè)主題,互不干擾。

當(dāng)新的消費(fèi)者加入群組或現(xiàn)有消費(fèi)者失敗時(shí),Kafka 會(huì)在群組中的消費(fèi)者之間重新分配分區(qū),以確保覆蓋所有分區(qū)。

序列化和反序列化

Kafka中的序列化和反序列化是將資料在其原始格式和位元組數(shù)組之間進(jìn)行轉(zhuǎn)換以進(jìn)行傳輸和存儲(chǔ),從而使生產(chǎn)者和消費(fèi)者能夠高效地進(jìn)行通訊。

序列化

是將物件或資料結(jié)構(gòu)轉(zhuǎn)換為位元組流以便傳輸或儲(chǔ)存的過(guò)程。在生產(chǎn)者將資料傳送到 Kafka 主題之前,它將資料(鍵和值)序列化為位元組數(shù)組。

常見(jiàn)序列化格式:

  • JSON:人類(lèi)可讀,廣泛相容。
  • Avro:緊湊高效,基於模式。
  • Protobuf:緊湊、基於模式且與語(yǔ)言無(wú)關(guān)。
  • 字串:簡(jiǎn)單的基於文字的序列化。
  • 自訂序列化:滿(mǎn)足特定於應(yīng)用程式的需求。

反序列化

是相反的過(guò)程,其中位元組流被轉(zhuǎn)換回其原始物件或資料結(jié)構(gòu)。當(dāng)消費(fèi)者從 Kafka 主題讀取資料時(shí),它將位元組數(shù)組反序列化回可用的格式進(jìn)行處理。

壓縮

壓縮是指在儲(chǔ)存或傳輸訊息之前減少訊息的大小。它透過(guò)在生產(chǎn)者、代理商和消費(fèi)者之間發(fā)送較小的有效負(fù)載來(lái)優(yōu)化儲(chǔ)存使用、減少網(wǎng)路頻寬消耗並提高整體效能。

當(dāng)生產(chǎn)者向 Kafka 主題發(fā)送訊息時(shí),它可以在傳輸之前對(duì)訊息進(jìn)行壓縮。壓縮的訊息原樣儲(chǔ)存在代理程式上,並由消費(fèi)者在讀取訊息時(shí)解壓縮。

優(yōu)點(diǎn)

  • 減少網(wǎng)路頻寬:較小的有效負(fù)載意味著透過(guò)網(wǎng)路傳輸?shù)馁Y料較少。
  • 較低的儲(chǔ)存需求:壓縮訊息佔(zhàn)用較少的磁碟空間。
  • 提高吞吐量:較小的訊息可以實(shí)現(xiàn)更快的資料傳輸和處理。

什麼時(shí)候使用?

  • 訊息大小較大的用例:壓縮大大減少了資料大小。
  • 高吞吐量系統(tǒng):減少網(wǎng)路和儲(chǔ)存資源的壓力。
  • 批次:當(dāng)生產(chǎn)者將多個(gè)訊息一起批次時(shí),壓縮效果最佳。

雖然壓縮可以節(jié)省資源,但必須平衡 CPU 使用率和壓縮優(yōu)勢(shì)之間的權(quán)衡,選擇適合您的用例的編解碼器。

支援的壓縮類(lèi)型

  • 無(wú): 無(wú)壓縮(預(yù)設(shè))。
  • Gzip:壓縮比高,但CPU佔(zhàn)用率較高。
  • Snappy:平衡的壓縮速度和CPU使用率,適合即時(shí)使用案例。
  • LZ4:更快的壓縮和解壓縮,針對(duì)低延遲系統(tǒng)進(jìn)行了最佳化。
  • Zstd: 高壓縮比,效能比 Gzip 更好,較新的 Kafka 版本支援。

調(diào)音

最佳化 Apache Kafka 的效能涉及微調(diào)各個(gè)元件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調(diào)優(yōu) Kafka 時(shí)需要考慮的一些方面:

  • 分區(qū)管理:

    • 分區(qū)計(jì)數(shù):增加分區(qū)數(shù)量以增強(qiáng)並行性和吞吐量。但是,請(qǐng)避免過(guò)多的分區(qū)以防止管理開(kāi)銷(xiāo)。根據(jù)您的消費(fèi)者能力和所需的消費(fèi)率調(diào)整分區(qū)數(shù)量。
  • 生產(chǎn)者配置:

    • 批次:配置batch.size和linger.ms以實(shí)現(xiàn)高效的訊息批次,減少請(qǐng)求數(shù)量並提高吞吐量。
    • 壓縮: 實(shí)作壓縮(例如,compression.type=snappy)以減少訊息大小,從而減少網(wǎng)路和儲(chǔ)存使用。請(qǐng)注意壓縮帶來(lái)的額外 CPU 開(kāi)銷(xiāo)。
  • 消費(fèi)者配置:

    • 取得設(shè)定:調(diào)整 fetch.min.bytes 和 fetch.max.wait.ms 以控制消費(fèi)者檢索訊息的方式,根據(jù)應(yīng)用程式的需求平衡延遲和吞吐量。

實(shí)際例子

想像一個(gè)應(yīng)用程式記錄房間內(nèi)的溫度並使用 Kafka 傳輸該數(shù)據(jù),然後另一個(gè)應(yīng)用程式處理該數(shù)據(jù)。為簡(jiǎn)單起見(jiàn),我們將僅關(guān)注 Kafka 方面,生產(chǎn)者和消費(fèi)者都在同一應(yīng)用程式中實(shí)現(xiàn)。在這種情況下,特定時(shí)刻記錄的每個(gè)溫度都代表一個(gè)事件:

{
  temperature: 42,
  timeStamp: new Date(),
};

所有程式碼都將在此儲(chǔ)存庫(kù)中。

首先,我們需要一個(gè) Kafka 代理,但我們不需要在我們的機(jī)器中安裝 Kafka,只需使用這個(gè) Docker Kafka 映像即可。

先拉取該影像:

docker 拉 apache/kafka

然後運(yùn)行它,映射 Kafka 在我們機(jī)器上的同一連接埠上偵聽(tīng)的連接埠:

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

就是這樣,我們有一個(gè)正在運(yùn)行的 Kafka 代理,在繼續(xù)之前,您可能想通過(guò)創(chuàng)建主題、發(fā)送和使用訊息來(lái)嘗試它,只需按照該圖像頁(yè)面上的說(shuō)明進(jìn)行操作即可。

為了建立我們的應(yīng)用程序,我們將結(jié)合 NestJS 和 KafkaJS,首先使用 Nest CLI 建立應(yīng)用程式

嵌套新的我的巢項(xiàng)目

在專(zhuān)案資料夾內(nèi)安裝kafkajs

npm 我卡夫卡

並產(chǎn)生以下模組

巢g莫卡夫卡

nest g mo 製作人

巢 g mo 消費(fèi)者

巢穴溫度

Kafka 模組 將處理所有 Kafka 特定的操作,包括管理用於連接、斷開(kāi)連接、發(fā)送和接收訊息的消費(fèi)者和生產(chǎn)者類(lèi)別。這將是唯一直接與 kafkajs 套件互動(dòng)的模組。

生產(chǎn)者和消費(fèi)者模組將充當(dāng)發(fā)布-訂閱平臺(tái)(在本例中為 Kafka)與應(yīng)用程式其餘部分之間的接口,抽象平臺(tái)特定的詳細(xì)資訊。

溫度模組將管理事件。它不需要知道正在使用哪個(gè)發(fā)布-訂閱平臺(tái),只需要消費(fèi)者和生產(chǎn)者即可運(yùn)作。

建立模組後,我們也會(huì)建立一個(gè)資料夾 src/interface 並在其中新增以下介面:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

在 src/kafka/ 資料夾中新增實(shí)作這些介面的生產(chǎn)者和消費(fèi)者類(lèi)別:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

不要忘記在 kafka.module.ts 中匯出這些類(lèi)別

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

現(xiàn)在我們可以轉(zhuǎn)到溫度模組並實(shí)例化這些 Kafka 類(lèi)別並開(kāi)始使用它們。然而,如果溫度模組不必?fù)?dān)心它正在使用哪個(gè) pub-sub 平臺(tái),那就更好了。相反,它應(yīng)該簡(jiǎn)單地與注入的生產(chǎn)者和/或消費(fèi)者一起工作,只專(zhuān)注於發(fā)送和接收訊息,而不管底層平臺(tái)如何。這樣,如果我們決定將來(lái)切換到不同的 pub-sub 平臺(tái),我們不需要對(duì)溫度模組進(jìn)行任何更改。

為了實(shí)現(xiàn)這種抽象,我們可以創(chuàng)建 Producer 和 Consumer 類(lèi)別來(lái)處理 Kafka Producer 和 Consumer 實(shí)作的細(xì)節(jié)。讓我們從製作人開(kāi)始:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

現(xiàn)在,消費(fèi)者:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

現(xiàn)在,我們可以專(zhuān)注於建立溫度模組。在Temperature.service.ts 檔案中,我們將建立一個(gè)方法來(lái)註冊(cè)溫度,在本例中,該方法將簡(jiǎn)單地使用生產(chǎn)者將溫度資料傳送到代理程式。此外,我們將實(shí)作一種方法來(lái)處理傳入訊息以用於演示目的。

這些方法可以由另一個(gè)服務(wù)或控制器呼叫。但是,為了簡(jiǎn)單起見(jiàn),在本範(fàn)例中,我們將在應(yīng)用程式啟動(dòng)時(shí)利用 onModuleInit 方法直接呼叫它們。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

就是這樣!透過(guò)在 Docker 容器中執(zhí)行代理,您可以啟動(dòng)應(yīng)用程式來(lái)傳送和接收訊息。此外,您可以使用以下命令在代理容器內(nèi)開(kāi)啟 shell:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

從那裡,您可以直接與代理互動(dòng)並向應(yīng)用程式發(fā)送訊息、從中接收訊息、創(chuàng)建新主題等。

這是包含本範(fàn)例程式碼的儲(chǔ)存庫(kù)。

以上是Kafka 基礎(chǔ)知識(shí)與實(shí)際範(fàn)例的詳細(xì)內(nèi)容。更多資訊請(qǐng)關(guān)注PHP中文網(wǎng)其他相關(guān)文章!

本網(wǎng)站聲明
本文內(nèi)容由網(wǎng)友自願(yuàn)投稿,版權(quán)歸原作者所有。本站不承擔(dān)相應(yīng)的法律責(zé)任。如發(fā)現(xiàn)涉嫌抄襲或侵權(quán)的內(nèi)容,請(qǐng)聯(lián)絡(luò)admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費(fèi)脫衣圖片

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅(qū)動(dòng)的應(yīng)用程序,用於創(chuàng)建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線(xiàn)上人工智慧工具。

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費(fèi)的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費(fèi)的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強(qiáng)大的PHP整合開(kāi)發(fā)環(huán)境

Dreamweaver CS6

Dreamweaver CS6

視覺(jué)化網(wǎng)頁(yè)開(kāi)發(fā)工具

SublimeText3 Mac版

SublimeText3 Mac版

神級(jí)程式碼編輯軟體(SublimeText3)

熱門(mén)話(huà)題

Java vs. JavaScript:清除混亂 Java vs. JavaScript:清除混亂 Jun 20, 2025 am 12:27 AM

Java和JavaScript是不同的編程語(yǔ)言,各自適用於不同的應(yīng)用場(chǎng)景。 Java用於大型企業(yè)和移動(dòng)應(yīng)用開(kāi)發(fā),而JavaScript主要用於網(wǎng)頁(yè)開(kāi)發(fā)。

掌握J(rèn)avaScript評(píng)論:綜合指南 掌握J(rèn)avaScript評(píng)論:綜合指南 Jun 14, 2025 am 12:11 AM

評(píng)論arecrucialinjavascriptformaintainingclarityclarityandfosteringCollaboration.1)heelpindebugging,登機(jī),andOnderStandingCodeeVolution.2)使用林格forquickexexplanations andmentmentsmmentsmmentsmments andmmentsfordeffordEffordEffordEffordEffordEffordEffordEffordEddeScriptions.3)bestcractices.3)bestcracticesincracticesinclud

JavaScript評(píng)論:簡(jiǎn)短說(shuō)明 JavaScript評(píng)論:簡(jiǎn)短說(shuō)明 Jun 19, 2025 am 12:40 AM

JavascriptconcommentsenceenceEncorenceEnterential gransimenting,reading and guidingCodeeXecution.1)單inecommentsareusedforquickexplanations.2)多l(xiāng)inecommentsexplaincomplexlogicorprovideDocumentation.3)

JavaScript數(shù)據(jù)類(lèi)型:深度潛水 JavaScript數(shù)據(jù)類(lèi)型:深度潛水 Jun 13, 2025 am 12:10 AM

JavaScripthasseveralprimitivedatatypes:Number,String,Boolean,Undefined,Null,Symbol,andBigInt,andnon-primitivetypeslikeObjectandArray.Understandingtheseiscrucialforwritingefficient,bug-freecode:1)Numberusesa64-bitformat,leadingtofloating-pointissuesli

如何在JS中與日期和時(shí)間合作? 如何在JS中與日期和時(shí)間合作? Jul 01, 2025 am 01:27 AM

JavaScript中的日期和時(shí)間處理需注意以下幾點(diǎn):1.創(chuàng)建Date對(duì)像有多種方式,推薦使用ISO格式字符串以保證兼容性;2.獲取和設(shè)置時(shí)間信息可用get和set方法,注意月份從0開(kāi)始;3.手動(dòng)格式化日期需拼接字符串,也可使用第三方庫(kù);4.處理時(shí)區(qū)問(wèn)題建議使用支持時(shí)區(qū)的庫(kù),如Luxon。掌握這些要點(diǎn)能有效避免常見(jiàn)錯(cuò)誤。

JavaScript與Java:開(kāi)發(fā)人員的全面比較 JavaScript與Java:開(kāi)發(fā)人員的全面比較 Jun 20, 2025 am 12:21 AM

JavaScriptIspreferredforredforwebdevelverment,而Javaisbetterforlarge-ScalebackendsystystemsandSandAndRoidApps.1)JavascriptexcelcelsincreatingInteractiveWebexperienceswebexperienceswithitswithitsdynamicnnamicnnamicnnamicnnamicnemicnemicnemicnemicnemicnemicnemicnemicnddommanipulation.2)

JavaScript:探索用於高效編碼的數(shù)據(jù)類(lèi)型 JavaScript:探索用於高效編碼的數(shù)據(jù)類(lèi)型 Jun 20, 2025 am 12:46 AM

javascripthassevenfundaMentalDatatypes:數(shù)字,弦,布爾值,未定義,null,object和symbol.1)numberSeadUble-eaduble-ecisionFormat,forwidevaluerangesbutbecautious.2)

為什麼要將標(biāo)籤放在的底部? 為什麼要將標(biāo)籤放在的底部? Jul 02, 2025 am 01:22 AM

PlacingtagsatthebottomofablogpostorwebpageservespracticalpurposesforSEO,userexperience,anddesign.1.IthelpswithSEObyallowingsearchenginestoaccesskeyword-relevanttagswithoutclutteringthemaincontent.2.Itimprovesuserexperiencebykeepingthefocusonthearticl

See all articles