Apache Kafka Nedir? Cluster Kafka Kurulum Nasıl Yapılır?

Arif KIZILTEPE
7 min readMay 1, 2020

--

Apache Kafka Nedir?

Teknolojinin gelişmesine bağlı olarak yazılım dünyasında da birçok yenilik mevcut. Buna göre artan sosyal medya eğilimi ve insanların ihtiyaçları, yine yazılım programlama ihtiyaçlarına da yol açmıştır. Apache kafka kavramı da bu noktada geliştirilen bir yapıdır. Linkedln tarafından geliştirilen bu framework yapısı şuan Apache bünyesinde yer almaktadır. Büyük verileri anlık olarak kaydeden, depolayan ve analiz eden bu açık kaynak kodlu framework büyük verilerin hızlı biçimde analiz edilmesi ve depolanması için kullanılmaktadır. Ayrıca analiz etmek için de mesajlaşma sistemini kullanır.

Hangi Amaçla Kullanılır?

Apache kafka, eş zamanlı veri akışı ve analizi yapan şirketler ya da kuruluşlar için anlık olarak gelebilen güncel bilgiler eşliğinde şirketlerin ne stoklamaları ve ne satmaları gibi kararlar almasında yardımcı olan bir sistemdir. Şirket ya da kuruluşlar ise karar verme noktasında verileri analiz edip depolamak için performansı yüksek olan Apache Kafka’yı kullanır.

Apache kafkanın tam olarak anlaşılabilmesi adına şu örnek verilebilir; Anlık ve popüler olarak yüzlerce hatta binlerce aktif kullanıcıya sahip olan herhangi bir web sitesi, aktif kullanıcıların web sitesi içerisinde davranış ve hareketlerinden haberdar olmak istemektedir. Böyle bir şey düşünüldüğünde ise binlerce kullanıcı aynı zaman içerisinde sitede aktif halde birçok işlem yapmakta ve bir sayfadan diğer sayfaya ilerlemekte, butonlara tıklamaktadır. Bu veri trafikleri içerisinde verilerin daha kolay toplanıp depo edilmesini ise web siteleri Apache Kafka ile gerçekleştirmektedir. Kullandığı mesajlaşma sistemi ile kullanıcı loglarını anlık olarak kaydeden ve analiz eden bu sistemi Spotify, Netflix gibi şirketler de kullanmaktadır.

Apache ZooKeeper

Apache kafkanın ürünleri arasında Apache ZooKeeper yer almaktadır. Apache yazılım vakfı tarafından geliştirilen bir yazılım projesi olan apache zookeeper, büyük dağıtık sistemler için kullanılan dağıtık bir yapılandırma hizmetidir. Üst düzey bir Apache projesi olan ZooKeeper, adlandırma katkı ve senkronizasyon hizmeti yapmak adına kullanılan bir hiyerarşik anahtar değer deposudur. Genellikle dağıtık sistemler için kullanılmaktadır.

Apache ZooKeeper projesi, yedekli hizmetler aracılığı işe yüksek kullanılabilirliği desteklemektedir. Bu sayede müşteriler, ilk etapta cevap vermese dahi başka bir ZooKeeper liderine sorabilirler. Apache ZooKeeper, verileri bir ağaç veri yapısı ya da dosya sistemi şeklinde hiyerarşik bir ad altında depolamaktadır.

İstemciler düğümler üzerinden verileri okuyabilir ve düğümlere bu verileri yazabilir. Apache ZooKeeper sayesinde bu şekilde bir paylaşımlı yapılandırma hizmetinden yararlanabilirler.

ZooKeeper’in başlıca özellikleri:

  • Güvenilir bir sistemdir,
  • Basit bir mimariye sahiptir,
  • İşlemleri hızlı biçimde işleyebilir,
  • Ölçeklenebilir bir yapısı vardır.

Çoğu önemli şirketin Apache Kafka ve Apache ZooKeeper mimarilerini kullandıkları bilinmektedir.

Kafka uygulamasında aşağıdaki ifadeleri çokça kullanacağım.

  • Producer : Message’ları publish eden yani kafka’ya message üreten yapının/uygulamanın adı.
  • Consumer : Publish edilmiş message’ları retrieve/consume eden uygulama.
  • Zookeper : Distributed olarak multiple instance çalışan uygulamaları koordine etmede kullanılan bir uygulamadır.

Kurulum sırasında ve testlerimizde çok fazla port görebilirsin. Bizim için önemli olan 2 tanesi

2181 : Zookeeper

9092 : Kafka

Cluster Kafka Kurulumu

Cluster kafka kurulumu için 2 adet CentOS 7 sunucu kurulumu yapmamız gerekiyor. Benim kullanacağım sunucu bilgileri;

Server 1 = 10.2.41.25

Server 2 = 10.2.41.213

Server 3 = 10.2.41.79

Not: Yapılan bazı işlemlerin sonlarına parantez içerisinde “Her iki sunucuda” yazacağım. Bu işlemin her iki sunucuda da yapılması gerektiği anlamına gelmektedir.

Kafka uygulaması javaya ihtiyaç duyduğu için öncelikle sunucuya java kurululu yapılır. (Tüm sunucularda)

yum -y install java-11-openjdk java-11-openjdk-devel unzip wget git

Kurulum için güncel dosya kafka dosyaları indirilir. (Tüm sunucularda)

curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o /opt/kafka.tgz

İndirilen dosya /opt altına çıkartılır. (Tüm sunucularda)

tar -xvzf kafka.tgz

Kafka uygulaması portable bir uygulama olduğu için bulunduğu dizinde çalışacaktır. Bu aşamadan sonra kafka ve zookeeper uygulamasını servis haline getireceğiz ve gerekli konfigürasyonu tamamlayacağız.

Konfigürasyon

Cluster Zookeeper Konfigürasyonu

Cluster zookeeper konfigürsayonu için kafka dizini altındaki zookeeper.properties aşağıdaki parametreler yazılır.

Server 1

dataDir=/opt/data
initLimit=5
syncLimit=2
tickTime=2000
server.1=0.0.0.0:2888:3888
server.2=10.2.41.213:2888:3888
server.3=10.2.41.79:2888:3888

Server 2

dataDir=/opt/data
initLimit=5
syncLimit=2
tickTime=2000
server.1=10.2.41.25:2888:3888
server.2=0.0.0.0:2888:3888
server.3=10.2.41.79:2888:3888

Server 3

dataDir=/opt/data
initLimit=5
syncLimit=2
tickTime=2000
server.1=10.2.41.25:2888:3888
server.2=10.2.41.213:2888:3888
server.3=0.0.0.0:2888:3888

Daha önce belirlediğimiz /opt/data dizini oluşturulur ve zookeeper için gerekli olan myid dosyası id numaraları ile oluşturulur.

Server 1

mkdir /opt/data
echo "1" > /opt/data/myid

Server 2

mkdir /opt/data
echo "2" > /opt/data/myid

Server 3

mkdir /opt/data
echo "3" > /opt/data/myid

Cluster Kafka Konfigürasyonu

Cluster yapıdaki kafka uygulamalarını zookeeper uygulaması ile bağlantısını sağlamak için

server.properties içerisinde aşağıdaki parametreler güncellenir.

Server 1

zookeeper.connect=10.2.41.25:2181,10.2.41.213:2181,10.2.41.79:2181
advertised.host.name=10.2.41.25
broker.id=0

Server 2

zookeeper.connect=10.2.41.25:2181,10.2.41.213:2181,10.2.41.79:2181
advertised.host.name=10.2.41.213
broker.id=1

Server 3

zookeeper.connect=10.2.41.25:2181,10.2.41.213:2181,10.2.41.79:2181
advertised.host.name=10.2.41.79
broker.id=2

Konfigürasyon işlemi tamamlandıktan sonra servis oluşturmak için aşağıdaki örnek servis dosyaları kullanılır. (Tüm sunucularda)

vi /etc/systemd/system/zookeeper.service[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=root
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
vi /etc/systemd/system/kafka.service[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=root
ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Bir sonraki yazımdan kullanacağım kafka-manager uygulaması için /etc/environment içerisine aşağıdaki parametre ekledir. Bu parametre bahsettiğim kafka-manager uygulamasının brokerlar ile jmx üzerinden iletişim kurabilmesi için gerekmektedir. (Tüm sunucularda)

JMX_PORT=9989
export ZOOKEEPER_HOME=/opt/data

Servisler aktif edilir. (Tüm sunucularda)

systemctl start kafka
systemctl start zookeeper

Not: Start işlemlerini yaparken mutlaka logs/server.log takip edilmesi faydalı olacaktır.

Topic Oluşturma

Oluşturduğumuz cluster kafka konfigürasyonunda topic oluşturmak için aşağıdaki komut kullanılır. Bu işlemi herhangi bir sunucuda yapmak yeterli olacaktır.

Not : Topolojimizde 2 adet kafka bulunduğu için replication factor parametresine en fazla 2 verebilir. Bu parametre yazılan mesajın kaç sunucuya kopyalanacağını belirler. — partitions 4 parametresi ise her sunucuda 4 farklı partition olacağını belirtir. Bu bize birden fazla consumer ile daha hızlı okuma imkanı sağlar.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic ArifTestTopic

Oluşturduğumuz topic’i görüntülemek için aşağıdaki komut kullanılır.

bin/kafka-topics.sh --describe --topic ArifTestTopic- zookeeper localhost:2181

Çalıştırdığımız komutun çıktısında oluşturduğumuz topic’i partition ve bu partitionların lider bilgilerini görebiliriz.

Mesaj Gönderme ve Mesaj Alma

Mesaj gönderme işlemi için bir uygulama ayağa kaldırmaya gerek yok. Kendi testlerim için kullandığım autosendmessage.sh scriptti kullanabilirsiniz. Bu script CentOS üzerinde bulunan word dosyasından kelimeleri random bir şekilde okuyarak mesaj oluşturur ve bunu gönderir.

Scriptti biraz daha detaylı incelediğinizde — broker-list parametresinde 2 sunucunun kafka adreslerinin yazdığını görüyoruz. Bunun sebebi script mesaj yazarken eğer cluster yapımızda sunuculardan biri gider ise diğer sunucuya bağlanmasını sağlar.

Görünce nedir bu P1 — $messcounst dememeniz için minik bir açıklama yapayım. Ben bu scriptti kendi testlerim için yazmıştım ve testlerimde 10 adet producer oluşturdum. P1= Producer 1 anlamına geliyor mesaj sayınısını da failover testleri yaparken mesaj sayınısı kontrol etmek için kullanmıştım.

Scripttin çalışabilmesi için CentOS üzerine aşağıdaki paketin kurulması gerekmektedir.

yum install words

Artık başarılı bir şekilde oluşturduğumuz topic içerisine mesaj gönderiyoruz. Şimdi bu mesajları okumada bunun için consumer grup oluşturacağız ve okuma işlemine başlayacağız.

Consumer grup oluşturmak için config/consumer.properties dosyasından bir kopya oluşturun ve bu kopyaladığınız prop içerisinde aşağıdaki alanları güncelleyin.

bootstrap.servers=19.2.41.25:9092,10.2.41.213:9092 group.id=ArifTestConsumerGroup

Producer işleminde olduğu gibi consumer işleminde de failover durumda diğer sunucudan işlemleri yapabilmesi için her iki sunucu bilgileri yazılır.

Oluşturma işlemi bittikten sonra şu komut ile real-time artık yazılan mesajları okuyabilirsiniz.

bin/kafka-console-consumer.sh --bootstrap-server 10.2.41.213:9092,10.2.41.25:9092 - topic ArifTestTopic - consumer.config config/consumerArif.properties

Yukarıdaki komut sadece çalıştığı zamandan sonraki mesajları gösterir eğer tüm mesajları ve gelecek yeni mesajları okumak istiyorsanız — from-beginning parametresi ekleyerek şu komutu kullanabilirsiniz.

bin/kafka-console-consumer.sh --bootstrap-server 10.2.41.213:9092,10.2.41.25:9092 --topic ArifTestTopic --from-beginning --consumer.config config/consumerArif.properties

Biliyorsunuz biz yapımızı oluştururken 4 partition oluşturmuştuk bu durumda gelen mesaj bu 4 partition içerisine round robin mantığında dağıtılıyor. Sadece 2. partition içerisindeki mesajı okumak için — partition 2 parametresi eklenir ve şu komut kullanılabilir.

bin/kafka-console-consumer.sh --bootstrap-server 10.2.41.213:9092,10.2.41.25:9092 --topic ArifTestTopic --partition 2 --from-beginning --consumer.config config/consumerArif.properties

Yukarıdaki işlem genellikle çok fazla mesaj alan sistemlerde her partition için bir consumer oluşturularak mesajların hızlıca okunması için kullanılır.

Kafka Manager Kurulumu

Kafka manager yahoo tarafından geliştirilmiş kafka ekosistemini yönetmek ve görüntülemek için kullanılan bir web guidir. Kurulum mevcut sunuculardan birini kullanabilirsiniz.

Bu yazımda kafka manager uygulamasını anlatacağım fakat çalıştığım bir projede işlerimi daha kolay yapabilmem için geliştirdiğim ve geliştirmeye devam edeceğim ApacheKHC(Apache Kafka Health Checker) uygulamama aşağıdaki adresten erişebilir ve kullanabilirsiniz.

https://github.com/kzltp/Apache-Kafka-Health-Checker

Öncelikle gerekli paketlerin kurulumları yapılır.

Openjdk kurulum işlemleri bittinten sonra kafka manager uygulamasını kurmak için gerekli olan scala ve sbt uygulaması indirilir ve kurulum yapılır.

wget https://downloads.lightbend.com/scala/2.12.2/scala-2.12.2.rpmsudo yum localinstall -y scala-2.12.2.rpmcurl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.reposudo yum install sbt

Gerekli kurulumlar yapıldıktan sonra kafka manager uygulaması git üzerinden geçilir ve gerekli adımlar takip edilerek kurulum yapılır.

git clone https://github.com/yahoo/kafka-manager.gitcd ./kafka-managersbt clean dist

Build işlemi bittikten sonra dosya /opt/ altına kopyalanır.

mv ./target/universal/kafka-manager-1.3.3.8.zip /optcd /optunzip cmak-3.0.0.4.zipmv cmak-3.0.0.4 kafka-manager

/opt/kafka-manager/conf/application.conf içerisindeki “cmak.zkhosts” parametresine mevcut zookeeper bilgileri yazılır.

Kurulum ve konfigürsayon işlemleri bittikten sonra “/opt/kafka-manager/bin/cmak” çalıştırılarak uygulama açılır.

9000 portuna bağlanarak uygulamaya erişilir.

Cluster>Add Cluster yolu izlenerek mevcut ekosistem bilgileri yazılır.

Ekran Görüntüleri;

--

--

No responses yet