


Bagaimana untuk menghasilkan dan mengambil mesej dari Apache Kafka dengan Java?
Jul 11, 2025 am 01:43 AMKunci untuk menghasilkan dan memakan mesej Apache Kafka menggunakan Java adalah dengan betul mengkonfigurasi API pengeluar dan pengguna dan memahami proses asas mereka. 1. Pertama tambahkan kebergantungan klien Kafka untuk memastikan versi itu serasi dengan kluster; 2. Apabila menulis pengeluar, konfigurasikan bootstrap.servers, key.serializer dan value.serializer, dan buat contoh Kafkaproducer untuk menghantar mesej, perhatikan untuk menutup sumber dan pemprosesan panggilan balik pilihan; 3. Apabila menulis pengguna, konfigurasikan kumpulan.id, deserializer, dan lain -lain, gunakan Kafkaconsumer untuk melanggan topik dan tarik mesej dalam gelung, perhatikan strategi mengemukakan offset; 4. Masalah biasa termasuk konflik kumpulan.id, penyerahan offset yang tidak sesuai, dan penutupan sumber yang salah, dan lain-lain. Adalah disyorkan untuk menguji proses pengesahan dalam persekitaran yang berdiri sendiri terlebih dahulu.
Ia sebenarnya tidak sukar untuk menghasilkan dan menggunakan mesej Apache Kafka di Java. Inti terletak pada pemahaman konsep asas Kafka dan menguasai penggunaan API pengeluar dan pengguna. Selagi konfigurasi itu betul dan struktur kod jelas, penghantaran dan penerimaan mesej dapat dicapai dengan mudah.

Tambah Kafka Dependencies
Sebelum anda mula menulis kod, anda perlu memperkenalkan perpustakaan klien Kafka ke dalam projek anda. Jika anda menggunakan Maven, anda boleh menambah kebergantungan berikut dalam pom.xml
:
<geterlihat> <groupId> org.apache.kafka </groupId> <stifactid> Kafka-clients </artifactid> <versi> 3.6.0 </versi> </ketergantungan>
Versi ini agak baru dan agak stabil. Sudah tentu, anda juga boleh memilih versi klien yang sepadan berdasarkan versi kluster Kafka anda.

Tulis pengeluar kafka
Tugas pengeluar adalah menghantar mesej ke topik Kafka. Langkah -langkah utama termasuk mengkonfigurasi sifat, mewujudkan contoh pengeluar, membina rekod mesej dan menghantar.
Item konfigurasi utama adalah:

-
bootstrap.servers
: Alamat Broker Kafka -
key.serializer
danvalue.serializer
: Tentukan kaedah serialisasi kunci dan nilai.StringSerializer
biasanya digunakan.
Kod sampel adalah seperti berikut:
Sifat props = sifat baru (); props.put ("bootstrap.servers", "localhost: 9092"); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); Pengeluar <string, string> pengeluar = kafkaproducer baru <> (props); ProduceRrecord <string, string> record = produceRrecord baru <> ("my-topic", "kekunci", "nilai"); producer.send (rekod); producer.close ();
Beberapa perkara yang perlu diperhatikan:
- Lebih baik memanggil
close()
setelah menghantar untuk mengelakkan kebocoran sumber - Sekiranya anda ingin mengesahkan sama ada penghantaran berjaya, anda boleh menambah fungsi panggil balik.send
.send(record, callback)
- Sekiranya ia hanya ujian, anda boleh menghilangkan kunci
Tulis pengguna Kafka
Pengguna bertanggungjawab membaca mesej dari topik Kafka. Berbanding dengan pengeluar, logik pengguna sedikit lebih rumit kerana ia memerlukan secara aktif menarik mesej, memproses offset, dll.
Konfigurasi utama termasuk:
-
bootstrap.servers
: Juga tentukan alamat broker -
group.id
: ID Kumpulan Pengguna, mesti ditetapkan, jika tidak, ralat akan dilaporkan -
key.deserializer
danvalue.deserializer
: deserializer, biasanya menggunakanStringDeserializer
Proses pengguna yang mudah adalah seperti berikut:
Sifat props = sifat baru (); props.put ("bootstrap.servers", "localhost: 9092"); props.put ("kumpulan.id", "kumpulan ujian"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); KafKaconsumer <String, String> Consumer = New KafKaconsumer <> (props); consumer.subscribe (Collections.SingLetOnList ("My-Topic")); sementara (benar) { ConsumerRecords <string, string> records = consumer.poll (duration.ofmillis (100)); untuk (consumerRecord <string, string> rekod: rekod) { System.out.println ("Diterima:" Record.Value ()); } }
Catatan:
- Kaedah
poll()
akan menarik sekumpulan data dan memprosesnya dalam gelung - Jangan lupa untuk mengemukakan offset selepas penggunaan, anda boleh memilih untuk menghantar secara automatik atau secara manual
- Sekiranya anda mahu mengambil hanya sekali dan kemudian keluar, anda boleh memecahkan gelung selepas diproses
Soalan dan langkah berjaga -jaga yang sering ditanya
Kadang -kadang anda akan mendapati bahawa mesej itu belum diterima atau telah dimakan berulang kali, yang biasanya disebabkan oleh sebab -sebab berikut:
- Kumpulan Pengguna.ID Menetapkan Kesalahan atau Konflik
- Keras frekuensi offset secara automatik terlalu tinggi atau terlalu rendah
- Kegagalan untuk menutup pengeluar atau pengguna dengan betul menghasilkan status yang tidak konsisten
- Broker Kafka tidak dimulakan atau rangkaian tidak disambungkan
Adalah disyorkan untuk terlebih dahulu menjalankan persekitaran yang berdiri sendiri semasa pembangunan dan kemudian pergi ke kluster.
Pada dasarnya itu sahaja. Proses menyambung ke Kafka di Java tidak rumit, tetapi beberapa butiran mudah diabaikan, seperti strategi bersiri, kumpulan.id, dan penyerahan. Selagi anda memberi perhatian kepada konfigurasi dan proses, anda boleh berjalan dengan cepat.
Atas ialah kandungan terperinci Bagaimana untuk menghasilkan dan mengambil mesej dari Apache Kafka dengan Java?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undress AI Tool
Gambar buka pakaian secara percuma

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Clothoff.io
Penyingkiran pakaian AI

Video Face Swap
Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas

Penggunaan rasional tag semantik dalam HTML dapat meningkatkan kejelasan struktur halaman, aksesibilitas dan kesan SEO. 1. Digunakan untuk blok kandungan bebas, seperti jawatan blog atau komen, ia mesti mandiri; 2. Digunakan untuk kandungan berkaitan klasifikasi, biasanya termasuk tajuk, dan sesuai untuk modul yang berlainan halaman; 3. Digunakan untuk maklumat tambahan yang berkaitan dengan kandungan utama tetapi tidak teras, seperti cadangan sidebar atau profil pengarang. Dalam perkembangan sebenar, label harus digabungkan dan lain -lain, elakkan bersarang yang berlebihan, pastikan struktur mudah, dan sahkan rasionalitas struktur melalui alat pemaju.

Apabila anda menghadapi "operasi ini memerlukan peningkatan keizinan", ini bermakna anda memerlukan kebenaran pentadbir untuk diteruskan. Penyelesaian termasuk: 1. Klik kanan program "Run As Administrator" atau tetapkan pintasan untuk sentiasa dijalankan sebagai pentadbir; 2. Periksa sama ada akaun semasa adalah akaun pentadbir, jika tidak, menukar atau meminta bantuan pentadbir; 3. Gunakan Kebenaran Pentadbir untuk membuka command prompt atau PowerShell untuk melaksanakan perintah yang relevan; 4. Menghapuskan sekatan dengan mendapatkan pemilikan fail atau mengubah suai pendaftaran apabila perlu, tetapi operasi tersebut perlu berhati -hati dan memahami sepenuhnya risiko. Sahkan identiti kebenaran dan cuba kaedah di atas biasanya menyelesaikan masalah.

Terdapat tiga perbezaan utama antara yang boleh dipanggil dan boleh dijalankan di Jawa. Pertama, kaedah yang boleh dipanggil boleh mengembalikan hasilnya, sesuai untuk tugas -tugas yang perlu mengembalikan nilai, seperti yang boleh dipanggil; Walaupun kaedah run () runnable tidak mempunyai nilai pulangan, sesuai untuk tugas -tugas yang tidak perlu kembali, seperti pembalakan. Kedua, Callable membolehkan untuk membuang pengecualian yang diperiksa untuk memudahkan penghantaran ralat; Walaupun Runnable mesti mengendalikan pengecualian secara dalaman. Ketiga, Runnable boleh dihantar secara langsung ke benang atau executorservice, sementara yang boleh dipanggil hanya boleh dikemukakan ke executorservice dan mengembalikan objek masa depan untuk

JavaprovidesmultiplesynchronizationToolsforthreadsafety.1.SynchronizedBlockSensensureMutualExclusionByLockingMethodsorspecificcodesections.2.reentrantlockoffersadvancedControl, termasuktrylockandfairnesspolicies.condition

Mekanisme pemuatan kelas Java dilaksanakan melalui kelas, dan aliran kerja terasnya dibahagikan kepada tiga peringkat: memuatkan, menghubungkan dan memulakan. Semasa fasa pemuatan, kelas muat turun secara dinamik membaca bytecode kelas dan mencipta objek kelas; Pautan termasuk mengesahkan ketepatan kelas, memperuntukkan memori kepada pembolehubah statik, dan rujukan simbol parsing; Inisialisasi melakukan blok kod statik dan tugasan pembolehubah statik. Pemuatan kelas mengamalkan model delegasi induk, dan mengutamakan loader kelas induk untuk mencari kelas, dan cuba bootstrap, lanjutan, dan appliclassloader pada gilirannya untuk memastikan perpustakaan kelas teras selamat dan mengelakkan pemuatan pendua. Pemaju boleh menyesuaikan kelas, seperti UrlClassl

Kunci pengendalian pengecualian Java adalah untuk membezakan antara pengecualian yang diperiksa dan tidak terkawal dan menggunakan percubaan cuba, akhirnya dan pembalakan munasabah. 1. Pengecualian yang diperiksa seperti IOException perlu dipaksa untuk mengendalikan, yang sesuai untuk masalah luaran yang diharapkan; 2. Pengecualian yang tidak terkawal seperti NullPointerException biasanya disebabkan oleh kesilapan logik program dan kesilapan runtime; 3. Apabila menangkap pengecualian, mereka harus khusus dan jelas untuk mengelakkan penangkapan umum pengecualian; 4. Adalah disyorkan untuk menggunakan sumber-sumber cuba untuk menutup sumber secara automatik untuk mengurangkan pembersihan kod manual; 5. Dalam pengendalian pengecualian, maklumat terperinci harus direkodkan dalam kombinasi dengan rangka kerja log untuk memudahkan kemudian

Java menyokong pengaturcaraan asynchronous termasuk penggunaan aliran yang boleh diselesaikan, aliran responsif (seperti ProjectReactor), dan benang maya di Java19. 1.CompletableFuture meningkatkan kebolehbacaan dan penyelenggaraan kod melalui panggilan rantai, dan menyokong orkestrasi tugas dan pengendalian pengecualian; 2. ProjectReactor menyediakan jenis mono dan fluks untuk melaksanakan pengaturcaraan responsif, dengan mekanisme tekanan belakang dan pengendali yang kaya; 3. Thread maya mengurangkan kos konvensional, sesuai untuk tugas I/O-intensif, dan lebih ringan dan lebih mudah untuk berkembang daripada benang platform tradisional. Setiap kaedah mempunyai senario yang berkenaan, dan alat yang sesuai harus dipilih mengikut keperluan anda dan model campuran harus dielakkan untuk mengekalkan kesederhanaan

Kata kunci statik digunakan dalam Java untuk membuat pembolehubah dan kaedah yang tergolong dalam kelas itu sendiri, dan bukannya contoh kelas. 1. Pembolehubah statik dikongsi oleh contoh semua kelas dan sesuai untuk menyimpan data yang dikongsi oleh semua objek, seperti nama sekolah dalam kelas pelajar. 2. Kaedah statik tergolong dalam kelas dan tidak bergantung kepada objek. Mereka sering digunakan dalam fungsi alat, seperti math.sqrt (), dan hanya boleh mengakses ahli statik lain. 3. Blok kod statik digunakan untuk melakukan operasi permulaan apabila memuatkan kelas, seperti pemuatan perpustakaan atau penetapan log. 4. Penggunaan statik rasional dapat menguruskan sumber dan tingkah laku peringkat kelas secara berkesan.
