User Tools

Site Tools


kafka

Kafka

Kafka Broker

Kafka (with ZooKeeper) in a VM

### Install JDK ###

JAVAVERSION=17
# For Ubuntu
sudo apt update
sudo apt install -y openjdk-${JAVAVERSION}-jdk net-tools
# For RedHat
sudo dnf install -y java-${JAVAVERSION}-openjdk net-tools

SCALA_VERSION=2.13
KAFKA_VERSION=3.7.1
DOWNLOAD_SITE=downloads.apache.org
#DOWNLOAD_SITE=archive.apache.org/dist
curl -OL https://${DOWNLOAD_SITE}/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
tar xzf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
cd kafka_${SCALA_VERSION}-${KAFKA_VERSION}

### Configure ZooKeeper ###

cat >> ./config/zookeeper.properties << EOF
tickTime=1000
initLimit=3600
syncLimit=15
# Below are the 3 VM hostnames with Zookeeper leader-connection and election ports
server.1=a.xyz:2888:3888
server.2=b.xyz:2888:3888
server.3=c.xyz:2888:3888
EOF

ZOOKEEPERDATA=/tmp/zookeeper  # Change dataDir in zookeeper.properties and put the new loc here
mkdir -p $ZOOKEEPERDATA
echo "1" > $ZOOKEEPERDATA/myid  # Use the ID defined in the "server.n" from zookeeper.properties

sudo touch /var/log/myzookeeper.log
sudo chown ubuntu. /var/log/myzookeeper.log

# Create a service file, but this is OK for now
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > /var/log/myzookeeper.log 2>&1 &


### Configure Kafka Broker ###

KAFKADATA=/tmp/kafka-logs  $ Change log.dirs in server.properties and put the new loc here
mkdir -p $KAFKADATA

# Change "num.partitions" to 3 (default for new topics) in server.properties
# Set "zookeeper.connect" to CSV of all 3 servers (port 2181)
# Set "broker.id" to "n" (diff number for each server)

sudo touch /var/log/mykafka.log
sudo chown ubuntu. /var/log/mykafka.log

# Create a service file, this is for testing
nohup ./bin/kafka-server-start.sh ./config/server.properties > /var/log/mykafka.log 2>&1 &

Kafka (kRaft) in a VM

Install Java with SDKman ( curl -s “https://get.sdkman.io” | bash ) and “sdk install java 25.0.3-tem” (if there's a newer version and this page is not updated, use “sdk list java” to get the list). Install Maven with “sdk install maven 3.9.16” and Gradle with “sdk install gradle 9.5.1”.

Extract the Kafka 4.2.0 tarball.

Configure kRaft

# config/kraft/server.properties
process.roles=broker,controller
node.id=1 # Change for every server
controller.quorum.voters=1@a.xyz:9093,2@b.xyz:9093,3@c.xyz:9093

#Default recommended is 6 (so each node gets 2 partitions)
num.partitions=3

#At least 2 nodes must be in-sync; change existing lines
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
#reinitialize the log storage after  changing the default partition no.
KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID

#re-use the UUID of the 1st server's storage on the 2nd & 3rd servers when formatting storage
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ./config/kraft/server.properties
sudo touch /var/log/mykafka.log && sudo chown ubuntu. /var/log/mykafka.log

# Create a service file, this is for testing
nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties > /var/log/mykafka.log 2>&1 &

Kafka in Docker

# In Docker Compose
services:
  kafka:
    image: apache/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

Kafka CLI

# List topics
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list

# Create a topic for testing
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --create --topic topic1 --partitions 1 --replication-factor 3 --config retention.ms=-1 --config retention.bytes=524288000 --config max.message.bytes=30000000 --config flush.messages=1

# List topics
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --list

# See details for the topic
./bin/kafka-topics.sh --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --describe --topic topic1

# To change the replication factor for a topic, use the "kafka-reassign-partitions" utility
cat > partcfg.json <<EOF
{"version":1,
  "partitions":[
     {"topic":"topic1","partition":0,"replicas":[1,2,3]},
     {"topic":"topic1","partition":1,"replicas":[1,2,3]},
     {"topic":"topic1","partition":2,"replicas":[1,2,3]}
]}
EOF
kafka-reassign-partitions --bootstrap-server=a.xyz:9092,b.xyz:9092,c.xyz:9092 --reassignment-json-file partcfg.json --execute

# To change any other config for an existing topic
# ./bin/kafka-configs.sh --bootstrap-server <host:port> --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=-1,retention.bytes=524288000
#

# See more at: https://kafka.apache.org/documentation/#topicconfigs
kafka.txt · Last modified: by reddy

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki