====== Kafka ====== ===== Kafka Broker ===== ==== Kafka (with ZooKeeper) in a VM ==== ### Install JDK ### JAVAVERSION=17 # For Ubuntu sudo apt update sudo apt install -y openjdk-${JAVAVERSION}-jdk net-tools # For RedHat sudo dnf install -y java-${JAVAVERSION}-openjdk net-tools SCALA_VERSION=2.13 KAFKA_VERSION=3.7.1 DOWNLOAD_SITE=downloads.apache.org #DOWNLOAD_SITE=archive.apache.org/dist curl -OL https://${DOWNLOAD_SITE}/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz tar xzf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz cd kafka_${SCALA_VERSION}-${KAFKA_VERSION} ### Configure ZooKeeper ### cat >> ./config/zookeeper.properties << EOF tickTime=1000 initLimit=3600 syncLimit=15 # Below are the 3 VM hostnames with Zookeeper leader-connection and election ports server.1=a.xyz:2888:3888 server.2=b.xyz:2888:3888 server.3=c.xyz:2888:3888 EOF ZOOKEEPERDATA=/tmp/zookeeper # Change dataDir in zookeeper.properties and put the new loc here mkdir -p $ZOOKEEPERDATA echo "1" > $ZOOKEEPERDATA/myid # Use the ID defined in the "server.n" from zookeeper.properties sudo touch /var/log/myzookeeper.log sudo chown ubuntu. /var/log/myzookeeper.log # Create a service file, but this is OK for now nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > /var/log/myzookeeper.log 2>&1 & ### Configure Kafka Broker ### KAFKADATA=/tmp/kafka-logs $ Change log.dirs in server.properties and put the new loc here mkdir -p $KAFKADATA # Change "num.partitions" to 3 (default for new topics) in server.properties # Set "zookeeper.connect" to CSV of all 3 servers (port 2181) # Set "broker.id" to "n" (diff number for each server) sudo touch /var/log/mykafka.log sudo chown ubuntu. /var/log/mykafka.log # Create a service file, this is for testing nohup ./bin/kafka-server-start.sh ./config/server.properties > /var/log/mykafka.log 2>&1 & ==== Kafka (kRaft) in a VM ==== Install Java with SDKman ( curl -s "https://get.sdkman.io" | bash ) and "sdk install java 25.0.3-tem" (if there's a newer version and this page is not updated, use "sdk list java" to get the list). Install Maven with "sdk install maven 3.9.16" and Gradle with "sdk install gradle 9.5.1". Extract the [[https://www.apache.org/dyn/closer.lua/kafka/4.2.0/kafka_2.13-4.2.0.tgz?action=download|Kafka 4.2.0 tarball]]. Configure kRaft # config/kraft/server.properties process.roles=broker,controller node.id=1 # Change for every server controller.quorum.voters=1@a.xyz:9093,2@b.xyz:9093,3@c.xyz:9093 #Default recommended is 6 (so each node gets 2 partitions) num.partitions=3 #At least 2 nodes must be in-sync; change existing lines offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 #reinitialize the log storage after changing the default partition no. KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)" echo $KAFKA_CLUSTER_ID #re-use the UUID of the 1st server's storage on the 2nd & 3rd servers when formatting storage ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ./config/kraft/server.properties sudo touch /var/log/mykafka.log && sudo chown ubuntu. /var/log/mykafka.log # Create a service file, this is for testing nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties > /var/log/mykafka.log 2>&1 & ==== Kafka in Docker ==== # In Docker Compose services: kafka: image: apache/kafka:latest container_name: kafka ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 ===== Kafka CLI ===== # List topics ./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list # Create a topic for testing ./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --create --topic topic1 --partitions 1 --replication-factor 3 --config retention.ms=-1 --config retention.bytes=524288000 --config max.message.bytes=30000000 --config flush.messages=1 # List topics ./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list # See details for the topic ./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --describe --topic topic1 # To change the replication factor for a topic, use the "kafka-reassign-partitions" utility cat > partcfg.json < --entity-type topics --entity-name --alter --add-config retention.ms=-1,retention.bytes=524288000 # # See more at: https://kafka.apache.org/documentation/#topicconfigs