在現(xiàn)代數(shù)據(jù)架構(gòu)中,kafka常被用于傳輸各種類型的數(shù)據(jù),包括文本、json以及二進(jìn)制數(shù)據(jù),例如圖像或視頻流。當(dāng)處理二進(jìn)制數(shù)據(jù)時(shí),核心挑戰(zhàn)在于確保生產(chǎn)者正確序列化數(shù)據(jù),而消費(fèi)者能夠正確反序列化數(shù)據(jù)。java kafka api提供了靈活的配置選項(xiàng)來支持多種數(shù)據(jù)類型,但錯(cuò)誤的配置會(huì)導(dǎo)致運(yùn)行時(shí)錯(cuò)誤,其中最常見的就是類型轉(zhuǎn)換異常。
當(dāng)Kafka消費(fèi)者嘗試接收?qǐng)D像這類二進(jìn)制數(shù)據(jù)時(shí),如果配置不當(dāng),最常見的錯(cuò)誤是 java.lang.ClassCastException: class java.lang.String cannot be cast to class [B。這個(gè)錯(cuò)誤明確指出,消費(fèi)者預(yù)期接收的是字節(jié)數(shù)組([B),但實(shí)際從Kafka接收到的數(shù)據(jù)被反序列化成了字符串(java.lang.String)。
根本原因: Kafka消費(fèi)者通過ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG配置來確定如何將從Kafka主題中讀取的原始字節(jié)數(shù)據(jù)轉(zhuǎn)換成Java對(duì)象。如果生產(chǎn)者發(fā)送的是字節(jié)數(shù)組,而消費(fèi)者配置的是StringDeserializer,那么消費(fèi)者會(huì)將這些字節(jié)嘗試解碼為字符串,當(dāng)后續(xù)代碼試圖將這個(gè)字符串強(qiáng)制轉(zhuǎn)換為字節(jié)數(shù)組時(shí),就會(huì)拋出ClassCastException。
解決方案: 要正確接收二進(jìn)制數(shù)據(jù),必須將值反序列化器配置為ByteArrayDeserializer。
以下是修正后的Kafka消費(fèi)者配置示例:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; // 導(dǎo)入ByteArrayDeserializer import java.util.Properties; public class ImageConsumerConfig { public KafkaConsumer<String, byte[]> createConsumer(String bootstrapServers, String topic, String consumerId) { Properties prop = new Properties(); prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 關(guān)鍵修正:將值反序列化器設(shè)置為ByteArrayDeserializer prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerId); prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // 暫時(shí)注釋或根據(jù)需求調(diào)整,下文會(huì)詳細(xì)討論 // 消費(fèi)者聲明的泛型類型也必須與反序列化器匹配 KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(prop); // consumer.subscribe(Arrays.asList(topic)); // 訂閱可以在創(chuàng)建后進(jìn)行 return consumer; } }
通過將VALUE_DESERIALIZER_CLASS_CONFIG設(shè)置為ByteArrayDeserializer.class.getName(),消費(fèi)者將能夠正確地將接收到的字節(jié)數(shù)據(jù)反序列化為Java的byte[]類型,從而避免ClassCastException。
在解決了反序列化問題后,可能會(huì)遇到另一個(gè)現(xiàn)象:盡管數(shù)據(jù)流存在,但消費(fèi)者在接收到第一條記錄后,后續(xù)嘗試接收的數(shù)據(jù)似乎是空的或不完整的。這通常與消費(fèi)者循環(huán)邏輯和MAX_POLL_RECORDS_CONFIG的配置有關(guān)。
立即學(xué)習(xí)“Java免費(fèi)學(xué)習(xí)筆記(深入)”;
問題分析: 原始代碼片段中存在兩個(gè)關(guān)鍵點(diǎn)可能導(dǎo)致此問題:
結(jié)合這兩點(diǎn),每次poll調(diào)用最多返回一條記錄,并且這條記錄總是被存儲(chǔ)到message_send[0]中,導(dǎo)致數(shù)組的其他位置始終為null或未被填充。如果message_send是一個(gè)預(yù)先分配的固定大小數(shù)組,并且期望它能累積多條記錄,這種邏輯將導(dǎo)致只有第一個(gè)元素被有效填充(且可能被后續(xù)的poll結(jié)果覆蓋)。
解決方案: 要正確接收和存儲(chǔ)多條記錄,需要調(diào)整MAX_POLL_RECORDS_CONFIG并妥善管理數(shù)據(jù)存儲(chǔ)數(shù)組的索引。
以下是修正后的消費(fèi)循環(huán)示例,假設(shè)message_send是一個(gè)動(dòng)態(tài)列表,用于累積所有接收到的圖像:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class ImageConsumerLogic { // 假設(shè) dispatcher.consumer 已正確初始化 // 假設(shè) dispatcher.AcceptedNumberJobs 和 dispatcher.queue_size 是用于控制循環(huán)的計(jì)數(shù)器 // 為了示例清晰,這里簡(jiǎn)化了 dispatcher 的使用 public void consumeImages(KafkaConsumer<String, byte[]> consumer, String topic, int expectedRecords) { List<byte[]> receivedImages = new ArrayList<>(); // 使用列表動(dòng)態(tài)存儲(chǔ)接收到的圖像 System.out.println("Starting Consuming"); // 訂閱主題,通常在消費(fèi)者創(chuàng)建后訂閱一次即可 consumer.subscribe(Collections.singletonList(topic)); // 示例循環(huán)條件:直到接收到足夠數(shù)量的圖像或達(dá)到某個(gè)退出條件 while (receivedImages.size() < expectedRecords) { System.out.println("Polling for records..."); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); // 增加poll超時(shí)時(shí)間以等待更多消息 if (records.isEmpty()) { System.out.println("No records received in this poll. Waiting..."); continue; // 如果沒有記錄,繼續(xù)下一次poll } System.out.println("Received " + records.count() + " records."); for (ConsumerRecord<String, byte[]> record : records) { // 直接處理或存儲(chǔ)接收到的字節(jié)數(shù)組 byte[] imageData = record.value(); receivedImages.add(imageData); // 將圖像數(shù)據(jù)添加到列表中 // 打印一些信息以驗(yàn)證 System.out.println("Received image with size: " + imageData.length + " bytes from offset: " + record.offset()); // 根據(jù)實(shí)際需求,這里可以進(jìn)一步處理 imageData,例如保存到文件、顯示等 } // 提交偏移量,確保下次從正確的位置開始消費(fèi) consumer.commitSync(); } System.out.println("Finished consuming. Total images received: " + receivedImages.size()); // 此時(shí) receivedImages 列表中包含了所有接收到的圖像數(shù)據(jù) } }
關(guān)鍵改進(jìn)點(diǎn):
在構(gòu)建Kafka消費(fèi)者應(yīng)用時(shí),除了上述核心問題的解決,還有一些通用的實(shí)踐建議可以幫助提升應(yīng)用的健壯性和性能:
正確地配置Kafka消費(fèi)者以接收二進(jìn)制數(shù)據(jù)是構(gòu)建可靠數(shù)據(jù)管道的基礎(chǔ)。通過將VALUE_DESERIALIZER_CLASS_CONFIG設(shè)置為ByteArrayDeserializer,可以有效解決ClassCastException。同時(shí),優(yōu)化消費(fèi)循環(huán)邏輯,特別是對(duì)MAX_POLL_RECORDS_CONFIG的理解和對(duì)數(shù)據(jù)存儲(chǔ)索引的正確管理,是確保所有消息都被完整接收的關(guān)鍵。遵循Kafka消費(fèi)者最佳實(shí)踐,如適當(dāng)?shù)钠屏抗芾?、資源關(guān)閉和異常處理,將進(jìn)一步提升應(yīng)用程序的穩(wěn)定性與效率。
以上就是Kafka Java消費(fèi)者接收?qǐng)D像數(shù)據(jù):類型轉(zhuǎn)換與多記錄處理實(shí)踐的詳細(xì)內(nèi)容,更多請(qǐng)關(guān)注php中文網(wǎng)其它相關(guān)文章!
Kafka Eagle是一款結(jié)合了目前大數(shù)據(jù)Kafka監(jiān)控工具的特點(diǎn),重新研發(fā)的一塊開源免費(fèi)的Kafka集群優(yōu)秀的監(jiān)控工具。它可以非常方便的監(jiān)控生產(chǎn)環(huán)境中的offset、lag變化、partition分布、owner等,有需要的小伙伴快來保存下載體驗(yàn)吧!
微信掃碼
關(guān)注PHP中文網(wǎng)服務(wù)號(hào)
QQ掃碼
加入技術(shù)交流群
Copyright 2014-2025 http://www.miracleart.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號(hào)