Apache Kafka ve Spring Boot

Fatma Delen
5 min readFeb 7, 2021

--

Publish-Subscribe tabanlı bir dağıtık mesajlaşma sistemidir.(Distributed Messaging System)

Mesajlaşma sistemi verinin bir uygulamadan diğerine aktarılmasını sağlar, dağıtık mesajlaşma aslında güvenilir bir kuyruk yapısı(Message Queue) olarak da adlandırılır.

Mesajlaşma sistemleri 2 yapıda olabilir.
— Point-to-Point
Bu yapıda birden fazla alıcı olabilmesine rağmen, bir mesaj sadece bir alıcıya iletilir.

— Publisher-Subscriber
Bu yapıda ise mesajlar bir kategori altında toplanırlar. Alıcılar birden fazla kategoriden veri çekebilir. Bu sistemde mesaj gönderen uygulamalara Publisher, alıcı uygulamalara da Subscriber adı verilir.

Peki Kafka’ya neden ihtiyaç duyuyoruz?

Veri gün geçtikçe büyüyor ve büyüyor olması, bilgi akışlarının gerçek zamanlı olmasını negatif etkileyen bir durum haline getiriyor. Amaç büyük veriyi tutmak değil, bu veriyi toplayıp ilgili sistemlere hatasız ve hızlı biçimde aktarmaktır. Genelde Kafka tek başına ele alınmamaktadır.(Kafka’yı kullanarak verinin ElasticSearch, Hadoop, Spark gibi sistemlere aktarılması söz konusu.)Verinin aktarılacağı sistemler kapalı olsa bile bir süre Kafka’da tutma imkanı bulunmaktadır.(Uç sistemlerden birinin çökmesi durumunda mesaj kaybını da engellemektedir.)

Terminoloji

— Publisher : Mesajı gönderen yerdir.

— Subscriber: Mesajı alan yerdir.

— Producer(Üretici): Publisher-Subscriber kelimelerinden Publisher’a karşıklık gelir ve bir ya da birden fazla Topic’e mesaj gönderen birimdir.

— Consumer(Tüketici): Publisher-Subscriber kelimelerinden Subscriber’a karşılık gelir Broker’dan mesajları okur. Bir ya da birden fazla Topic üzerinden veri okuyabilir.

— Topic: Mesajların tutulduğu kategorilere verilen addır.(Veritabanındaki tablo olarak da düşünebilirsiniz.)

— Partition: Topiclerden bir araya gelip oluşturduğu yapıdır. Her Partition farklı bir sunucuda olabilir. Böylece bir Topic birden fazla sunucuya yatay olarak ölçeklendirilebilir.

— Broker: Partitionlar bir araya gelip oluşturduğu yapıdır. Diğer bir deyişle de tek bir Kafka sunucusuna(Kafka Cluster’daki her bir Node) Broker adı verilir. Üzerinden akan mesaj başka bir Consumer(Tüketici-ler) tarafından okunabilir.

— Cluster: Kafka, dağıtılmış bir sistemdir(Distributed System). Bir Kafka Cluster, iş yükünü paylaşan birden çok broker içerir.

— Offset: Kafka’nın bir tüketiciye (consumer) gönderdiği son mesajın göstergesidir.

— ZooKeeper: Herhangi bir Broker eklendiğinde ya da çalışmadığı durumlarda, Broker hakkında Producer ve Consumer’ı bilgilendiren bir servis olarak tanımlayabiliriz. Kafka, Zookeeper’ı çeşitli configuration bilgilerini store etmek için kullanmaktadır.( Evet, bu cümleden, kafka’nın zookeeper ile beraber çalışmasının zorunlu olduğunu çıkartabiliriz.)

Apache Kafka Özellikleri

Apache Kafka hızlı, genişletilebilir ve güvenilir olması gibi özelliklerle ön plana çıkıyor.

1. Hızlı: Saniyede 2 milyonluk bir yazma performansına sahiptir.

2. Güvenilir: Mesajları disk üzerinde saklar ve Cluster’da replike ederek veri kaybının önüne geçer.

3. Genişletilebilir: Sistemi durdurmadan genişletilebilme özelliğine sahiptir.

Kafka’nın yaptığı şey hasara uğramadan gerçek zamana oldukça yakın sürelerde veri akışkanlığını sağlamaktır.

Apache Kafka ve Spring Boot

Şimdiiii projemizi yapmaya başlayalım. Ama öncelikle Kafka’yı kuralım.

Apache Kafka Docker image’ine bu linkten ulaşabilirsiniz.

Zookeeper’sız bir kafka kullanamayacağımızdan bahsettiğimize göre, docker üzerinden öncelikle aşağıdaki komut ile “jplock/zookeeper” image’ini download edelim ve container’ı ayağa kaldıralım.

docker run -d — name zookeeper — publish 2181:2181 jplock/zookeeper:latest

Ardından docker ps ile containerlarımızı listeleyelim.

Container hazır durumda artık kafka kurulumuna geçebiliriz….

Kurulum sırasında zookeeper’da olduğu gibi ilgili port’u bind edip, kafka container’ını zookeeper’a link’leyeceğiz. Link’leme işlemi sayesinde iki container birbirleri ile konuşabilir hale geleceklerdir.

docker run — name kafka -d -p “9092:9092” -e KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 — link zookeeper:zookeeper ches/kafka

“KAFKA_ADVERTISED_HOST_NAME” u 192.168.99.100 olarak belirliyoruz. Ip adresi olarak localhost yerine bunu belirtmemizin sebebi ise, Kafka’yı multiple brokers çalıştırabilmektir.

docker ps ile containerlarımızı kontrol edelim

NOT: spotify/kafka’da containerları ayrı ayrı kurmak yerine bu docker image’yla hem zookeper hem kafka aynı docker image’i içinde birbirleriyle konfigurasyonu yapılmış şekilde sunulmuştur. İstersek bunu kullanabiliriz.

Şimdi projemizi yazmaya başlayalım :)

Öncelikle Spring Initializr üzerinden bir Spring Boot projesi oluşturalım.

Ardından spotify/kafka image ini oluşturmak için resource klasörünün altında “docker-compose.yml” dosyasını oluşturalım.

Docker image’ini başlatmak için:
docker-compose -f src\main\resources\docker-compose.yml up -d komutunu kullanalım.

Şimdi java kodlarımızı yazmaya başlayalım…

Öncelikle KafkaMessage adında bir sınıf oluşturalım, bu sınıf Kafka’ya gönderilen mesajları temsil etsin. Bu sınıfta bir id, mesaj ve mesajın gönderilme zamanı olsun.

Bir ya da birden fazla topic’e mesaj gönderen Producer’in KafkaProducerConfig sınıfını yazalım. KafkaTemplate sınıfını kullanarak mesaj gönderebiliriz.

ProducerFactory, Kafka Producer örneklerini oluşturmaktan sorumludur.
BOOTSTRAP_SERVERS_CONFIG — Kafka’nın çalıştığı host ve port
KEY_SERIALIZER_CLASS_CONFIG — Key için kullanılacak serileştirici sınıfı.
VALUE_SERIALIZER_CLASS_CONFIG — Value için kullanılacak serileştirici sınıfı.(KafkaMessage tipinde göndereceğimiz için JsonSeriliazer kullanmalıyız.)
KafkaTemplate ile göndereceğimiz mesajı KafkaMessage tipinde yapalım.
örn: kafkaTemplate.send(topicName, new KafkaMessage(“Hello”));

Şimdi bir yada birden fazla topic üzerinden veri okuyan Consumer’ın KafkaConsumerConfig sınıfını yazalım.

Value Deserializer’ı JsonDeserializer yapmalıyız çünkü gelen mesaj KafkaMessage olduğu için(Custom Message) bu şekilde düzenleyelim.

Şimdi topic e mesajın gönderilmesi için KafkaController classımızı oluşturalım.

Mesajın gönderilip gönderilmediği ile ilgili bilgi almak istiyorsak ListenableFuture’i kullanmalıyız. Bu sayede başarılı olduğu durumda veya hata durumunda log basabiliriz.

Son olarak giden mesajlarımızı dinleyebileceğimiz bir KafkaListenerService classı oluşturalım.

Belirlenen topic ve gruptaki mesajları dinlemek için @KafkaListener anotasyonunu kullanıyoruz. Bu anotasyonun çalışması için KafkaConsumerConfig classında @EnableKafka anotasyonunu kullanmalıyız.

Projeyi çalıştırıp Postman üzerinden istek attığımızda console da şöyle bir log göreceksiniz.

Listenerları birden çok topic, patition ve belirli bir başlangıç ​​ofseti dinleyecek şekilde yapılandırabiliriz. Örneğin, bir topic e gönderilen tüm mesajları, uygulama başlangıcında oluşturulduğu andan itibaren almak istiyorsak, başlangıç ​​offseti sıfıra ayarlayabiliriz:

Projeyi çalıştırdığımızda başlangıçtan itibaren gönderilen mesajları göreceksiniz.

Gördüğünüz gibi daha önce giden mesajlarda deneme amaçlı “fatma” mesajını sürekli atmışım :) Bunu Listener’a ulaşmadan önce filtrelemek için:

Projeyi tekrar çalıştırdığımızda artık “fatma” içeren mesajların ignore edildiğini görebilirsiniz.

😇Okuduğunuz için teşekkür ederim…😇

--

--