logstash kafka elasticsearch 亿万级日志数据实时分析平台

<h3> 服务器环境 </h3>
server1: 10.1.11.199
server2: 10.1.11.6
server3: 10.1.11.64
三个服务器分别启动 hangout,elasticsearch,kafka,zookeeper

<h2> 工作原理 </h2>

在所需要收集日志的服务器logstash-agent,启动并将logstash output 定义为kafka。 做为kakfa消息来源的product。 之后在三台日志服务器上。分别启动hangout。作为kafka consumer 将日志提取,存入到elasticsearch 中。

<h3> 架构改变原因 </h3>
第一版 日志平台 所用软件 分别为 logstash-> redis -> logstash -> elasticsearch, 第二版整体架构有很大的改变。 将redis替换为kafka集群,将logstash index 替换为 hangout。
<h3> 更换 redis 为kafka </h3>

redis

* 优点
1. 简单,真的很简单,却相当的灵活。
2. 数据结构丰富
3. 采用内存作为数据存储。 高速读写

* 缺点
1. redis 使用纯内存,存储成本高。容易造成服务器资源不足
2. 数据量大,队列容易造成挤压,出现延迟
3. 2.4版本不能做到集群模式,无法将数据有效的分配到每个redis节点

kafka

* 优点
1. 采用分布式集群分片存储模式,效率高
2. 设计模式优秀,完全能够胜任海量数据消息队列存储
* 缺点
1. 采用文件作为存储,容易造成IO压力过大
2. 开启集群模式,需要额外维护Zookeeper

<h3> 更换 logstash 为 hangout </h3>

logstash

* 优点
1. 配置简单,真的很简单,却相当的灵活。
2. 插件丰富

* 缺点
1. 采用 filter 和grok 做为过滤机制效率低,容易造成CPU 压力高

hangout

* 优点
1. 基于logstash 开发。CPU使用率比logstash低2-3倍。性能比logstash 高2倍
* 缺点
1. 支持的插件目前只有kafka和elasticsearch

<h2> 服务安装 </h2>

kafka安装

环境

10.1.11.6 zk1
10.1.11.64 zk2
10.1.11.199 zk3

软件版本

zookeeper-3.4.6.tar.gz
kafka-0.8.1.1-src.tgz

部署

安装 zookeeper 和 kafka

* 下载

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz

* 修改环境变量
vi /etc/profile

export KAFKA_HOME=/export/home/elk/kafka_2.8.0-0.8.1.1
export ZK_HOME=/export/home/elk/zookeeper-3.4.6
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$KAFKA_HOME/bin:$ZK_HOME/bin:$PATH

* 创建 zookeeper  data dir

mkdir /export/home/db/zookeeper

* 修改zookeeper 配置文件

cd $ZK_HOME/conf
mv zoo_sample.cfg zoo.cfg

* vim zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/export/home/db/zookeeper
# the port at which the clients will connect
clientPort=2181
# zookeeper cluster
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

* 在每一台机器上面生成 myid

zk1

echo "1" > /export/home/db/zookeeper/myid

zk2

echo "2" > /export/home/db/zookeeper/myid

zk3

echo "3" > /export/home/db/zookeeper/myid

* 启动 zookeeper

[root@ip-10-1-11-6 conf]# /export/home/elk/zookeeper-3.4.6/bin/zkServer.sh  start
JMX enabled by default
Using config: /export/home/elk/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

* 安装 kafka

** zk1

vi $KAFKA_HOME/config/server.properties

broker.id=0
port=9092
host.name=zk1
advertised.host.name=zk1
...
num.partitions=2
...
zookeeper.connect=10.1.11.64:2181,10.1.11.6:2181,10.1.11.199:2181

** zk2

vi $KAFKA_HOME/config/server.properties

broker.id=1
port=9092
host.name=zk2
advertised.host.name=zk2
...
num.partitions=2
...
zookeeper.connect=10.1.11.64:2181,10.1.11.6:2181,10.1.11.199:2181

** zk3

vi $KAFKA_HOME/config/server.properties

broker.id=2
port=9092
host.name=zk3
advertised.host.name=zk3
...
num.partitions=2
...
zookeeper.connect=10.1.11.64:2181,10.1.11.6:2181,10.1.11.199:2181

* 三台机器分别启动 kafka

./kafka-server-start.sh /export/home/elk/kafka_2.8.0-0.8.1.1/config/server.properties &

kafka 扩容

* kafka的扩容难点:
1)主要在于增加机器之后,数据需要rebalance到新增的空闲节点,即把partitions迁移到空闲机器上。kafka提供了bin/kafka-reassign-partitions.sh工具,完成parttition的迁移。
2)kafka的集群的数据量加大,数据rebalance的时间较长。解决办法是把log.retention.hours=1设置一小时(生产参数24小时)。修改参数之后,重启kakfa节点,kafka会主动purge 1小时之前的log数据。

* 扩容过程

####  验证kafka节点是否正常加入集群。

zkCli.sh > ls /kafka/brokers/ids/

2. purge数据,使数据迁移更加快速

1) 替换retent.time,只保留最近一个小时的数据。主要是为了方面topic数据快速迁移。

sed -i 's/log.retention.hours=24/log.retention.hours=1/g' /apps/conf/kafka/server.properties

2)关闭kafka

jps -ml |grep 'kafka.Kafka' | awk '{print $1}' |xargs kill -9

3)验证kafka进程

jps -ml |grep 'kafka.Kafka' | awk '{print $1}'

4)启动

kafka-server-start.sh -daemon server.properties
jps -ml |grep 'kafka.Kafka' | awk '{print $1}'
增加partitions 因为增加节点,物理机器机器更多,需要增加partition的个数。
bin/kafka-topics.sh --zookeeper 10.1.11.6:2181 --alter --topic all --partitions 24

重新分配parttion(reassign partitions)

1)获取所有的topic

kafka-topics.sh --list --zookeeper 10.1.11.6:2181

2) reassign partitions**
生成需要迁移的topic partitions信息,broker-list为所有的节点,包括新增节点。

./bin/kafka-reassign-partitions.sh --broker-list "1,2,3" --topics-to-move-json-file move.json --zookeeper 10.1.11.6:2181 --generate

其中topics的json文件内容为:

{"topics": [{"topic": "logstash-product"}], "version":1 }

3)使用上一步生成的建议partition json内容进行完成迁移
“Proposed partition reassignment configuration”后面的内容保存到reassign.json文件中

bin/kafka-reassign-partitions.sh --broker-list "1,2,3" --reassignment-json-file reassign.json --zookeeper 10.1.11.6:2181 --execute

4)修改参数,重启kafka

sed -i 's/log.retention.hours=1/log.retention.hours=24/g' /apps/conf/kafka/server.properties

安装 elasticsearch

* 下载

wget http://soft.99ya.net/static/elk/elasticsearch-2.2.0.tar.gz

* 安装

tar zxf elasticsearch-2.2.0.tar.gz
cd elasticsearch-2.2.0

* 修改配置文件
vim config/elasticsearch.yml

cluster.name: elasticsearch
node.name: 10.1.11.199
network.host: 10.1.11.199
discovery.zen.ping.unicast.hosts: ["10.1.11.64", "10.1.11.199","10.1.11.6"]
discovery.zen.minimum_master_nodes: 2
node.max_local_storage_nodes: 3

* 启动

useradd elasticsearch
chown -R elasticsearch:elasticsearch elasticsearch-2.2.0
su elasticsearch
./bin/elasticsearch -d

安装 hangout

* 下载

wget http://soft.99ya.net/static/elk/hangout-0.1.4-ES2.2.tgz

* 安装

tar zxf hangout-0.1.4-ES2.2.tgz
cd hangout-0.1.4-ES2.2

* 修改配置文件
vim conf/hangout_server.conf

inputs:
- Kafka:
codec: json
topic:
logstash-product: 4
consumer_settings:
group.id: logstash
zookeeper.connect: 10.1.11.6:2181,10.1.11.199:2181,10.1.11.64:2181
auto.commit.interval.ms: "5000"
outputs:
- Elasticsearch:
cluster: elasticsearch
hosts:
- 10.1.11.6
- 10.1.11.64
- 10.1.11.199
index: 'logstash-%{+YYYY.MM.dd}'
index_type: logs # default logs
timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化

* 启动

./bin/hangout -f conf/hangout_server.conf

<h2>FAQ</h2>

* logstash 配置中的multiline 为什么要写在agent端
如果 multiline 写在 server端,则因为有这个规则存在 logstash 默认只能启动一个进程。影响效率

未经允许不得转载:99ya » logstash kafka elasticsearch 亿万级日志数据实时分析平台

2

  1. #1

    These are genuinely wonderful ideas in concerning
    blogging. You have touched some good things here. Any way keep up wrinting.
    Its like you read my thoughts! You appear to grasp a lot approximately this,
    like you wrote the e book in it or something. I think that you simply can do with a few percent to
    power the message house a little bit, however instead
    of that, that is wonderful blog. A great read.
    I will definitely be back. I am sure this article has touched all the
    internet people, its really really fastidious article on building up new webpage.
    http://foxnews.co.uk

    匿名8个月前 (12-27)