Kafka

Kafka

Kafka

安装

https://archive.apache.org/dist/kafka/2.2.0/

1
2
wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar xf kafka_2.12-2.2.0.tgz -C /usr/local/
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/kafka_2.12-2.2.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-2.2.0/config/zookeeper.properties
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal
User=root
Group=root
EnvironmentFile=/etc/default/zookeeper
[Install]
WantedBy=multi-user.target
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
 cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
ExecStart=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.2.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-abnormal
EnvironmentFile=/etc/default/kafka

[Install]
WantedBy=multi-user.target
1
2
3
4
\# cat /etc/default/zookeeper 
JAVA_HOME=/usr/local/jdk1.8.0_333
\# cat /etc/default/kafka 
JAVA_HOME=/usr/local/jdk1.8.0_333

列出topic

1
./kafka-topics.sh --list --zookeeper localhost:2181

列出消费groups

1
 ./kafka-consumer-groups.sh  --bootstrap-server "xxx:9092,xxx1:9092,xxx2:9092"   --list

列出retention信息

1
./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name __consumer_offsets --describe

列出group消费情况

1
./kafka-consumer-groups.sh  --bootstrap-server "xxx:9092,xxx1:9092,xxx2:9092" --group CONSUME_GROUP_NAME --describe

install_kafka.sh

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
#!/usr/bin/env bash
# used for install kafka by lqx at 20230703.
#
ip1=$1
ip2=$2
ip3=$3
# you are in 1, 2, or 3, when im=1, refers ip1
im=$4
[ -z $4 ] && { echo usage: bash $1 IP1 IP2 IP3 POSITION && exit 1; }
im_ip=$(eval echo '$'"ip$im")
echo "got im_ip $ip_ip"

[ -f kafka_2.12-2.2.0.tgz ] || wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -zxvf kafka_2.12-2.2.0.tgz
mv /root/kafka_2.12-2.2.0/*  /opt/kafka
mkdir -p /opt/kafka/zookeeper/data
mkdir -p /opt/kafka/kafka-logs
cat <<EOF > /opt/kafka/config/zookeeper.properties
dataDir=/opt/kafka/zookeeper/data
clientPort=2181
maxClientCnxns=0
server.1=$ip1:2888:3888
server.2=$ip2:2888:3888
server.3=$ip3:2888:3888
initLimit=5
syncLimit=2
EOF
echo $im > /opt/kafka/zookeeper/data/myid
cat <<EOF > /opt/kafka/config/server.properties
broker.id=$im
host.name=$im_ip
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://$im_ip:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.cleaner.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=$ip1:2181,$ip2:2181,$ip3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
EOF

adduser kafka
chown -R kafka:kafka /opt/kafka

echo "JAVA_HOME=$JAVA_HOME" >> /etc/default/zookeeper
echo "JAVA_HOME=$JAVA_HOME" >> /etc/default/kafka

cat <<EOF > /etc/systemd/system/zookeeper.service
[Unit]
Description=ZooKeeper for Kafka
After=network.target
[Service]
Type=forking
User=kafka
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
EnvironmentFile=/etc/default/zookeeper
[Install]
WantedBy=multi-user.target
EOF

cat <<EOF > /etc/systemd/system/kafka.service
[Unit]
Description=Kafka Broker
After=network.target
After=zookeeper.service
[Service]
Type=forking
User=kafka
SyslogIdentifier=kafka (%i)
Restart=on-failure
LimitNOFILE=16384:163840
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
EnvironmentFile=/etc/default/kafka
[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka

systemctl start zookeeper

sed  -i  "s/1G/4G/g" /opt/kafka/bin/kafka-server-start.sh
systemctl start kafka
systemctl status kafka

ps aux|grep zookekeepr
ps aux|grep kafka

参考文档