====== Kafka ======
===== Kafka Broker =====
==== Kafka (with ZooKeeper) in a VM ====
### Install JDK ###
JAVAVERSION=17
# For Ubuntu
sudo apt update
sudo apt install -y openjdk-${JAVAVERSION}-jdk net-tools
# For RedHat
sudo dnf install -y java-${JAVAVERSION}-openjdk net-tools
SCALA_VERSION=2.13
KAFKA_VERSION=3.7.1
DOWNLOAD_SITE=downloads.apache.org
#DOWNLOAD_SITE=archive.apache.org/dist
curl -OL https://${DOWNLOAD_SITE}/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
tar xzf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
cd kafka_${SCALA_VERSION}-${KAFKA_VERSION}
### Configure ZooKeeper ###
cat >> ./config/zookeeper.properties << EOF
tickTime=1000
initLimit=3600
syncLimit=15
# Below are the 3 VM hostnames with Zookeeper leader-connection and election ports
server.1=a.xyz:2888:3888
server.2=b.xyz:2888:3888
server.3=c.xyz:2888:3888
EOF
ZOOKEEPERDATA=/tmp/zookeeper # Change dataDir in zookeeper.properties and put the new loc here
mkdir -p $ZOOKEEPERDATA
echo "1" > $ZOOKEEPERDATA/myid # Use the ID defined in the "server.n" from zookeeper.properties
sudo touch /var/log/myzookeeper.log
sudo chown ubuntu. /var/log/myzookeeper.log
# Create a service file, but this is OK for now
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > /var/log/myzookeeper.log 2>&1 &
### Configure Kafka Broker ###
KAFKADATA=/tmp/kafka-logs $ Change log.dirs in server.properties and put the new loc here
mkdir -p $KAFKADATA
# Change "num.partitions" to 3 (default for new topics) in server.properties
# Set "zookeeper.connect" to CSV of all 3 servers (port 2181)
# Set "broker.id" to "n" (diff number for each server)
sudo touch /var/log/mykafka.log
sudo chown ubuntu. /var/log/mykafka.log
# Create a service file, this is for testing
nohup ./bin/kafka-server-start.sh ./config/server.properties > /var/log/mykafka.log 2>&1 &
==== Kafka (kRaft) in a VM ====
Install Java with SDKman ( curl -s "https://get.sdkman.io" | bash ) and "sdk install java 25.0.3-tem" (if there's a newer version and this page is not updated, use "sdk list java" to get the list). Install Maven with "sdk install maven 3.9.16" and Gradle with "sdk install gradle 9.5.1".
Extract the [[https://www.apache.org/dyn/closer.lua/kafka/4.2.0/kafka_2.13-4.2.0.tgz?action=download|Kafka 4.2.0 tarball]].
Configure kRaft
# config/kraft/server.properties
process.roles=broker,controller
node.id=1 # Change for every server
controller.quorum.voters=1@a.xyz:9093,2@b.xyz:9093,3@c.xyz:9093
#Default recommended is 6 (so each node gets 2 partitions)
num.partitions=3
#At least 2 nodes must be in-sync; change existing lines
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
#reinitialize the log storage after changing the default partition no.
KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
#re-use the UUID of the 1st server's storage on the 2nd & 3rd servers when formatting storage
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ./config/kraft/server.properties
sudo touch /var/log/mykafka.log && sudo chown ubuntu. /var/log/mykafka.log
# Create a service file, this is for testing
nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties > /var/log/mykafka.log 2>&1 &
==== Kafka in Docker ====
# In Docker Compose
services:
kafka:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
===== Kafka CLI =====
# List topics
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list
# Create a topic for testing
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --create --topic topic1 --partitions 1 --replication-factor 3 --config retention.ms=-1 --config retention.bytes=524288000 --config max.message.bytes=30000000 --config flush.messages=1
# List topics
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list
# See details for the topic
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --describe --topic topic1
# To change the replication factor for a topic, use the "kafka-reassign-partitions" utility
cat > partcfg.json < --entity-type topics --entity-name --alter --add-config retention.ms=-1,retention.bytes=524288000
#
# See more at: https://kafka.apache.org/documentation/#topicconfigs