喵星之旅-调皮的大象-Flume使用

https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#setting-multi-agent-flow

监控端口数据官方案例

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

在flume目录下创建job文件夹并进入job文件夹。

1
2
mkdir -p /opt/bunny/flume/job/simpleCase
cd /opt/bunny/flume/job/simpleCase

编写配置文件(注释去掉!!!):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
vim flume-1-netcat-logger.conf

#Name the components on this agent
a1.sources = r1 # 为a1的Source组件命名为r1,多个组件用空格间隔
a1.sinks = k1 # 为a1的Sink组件命名为k1,多个组件用空格间隔
a1.channels = c1 # 为a1的Channel组件命名为c1,多个组件用空格间隔

# Describe/configure the source
a1.sources.r1.type = netcat # 配置r1的类型
a1.sources.r1.bind = h102 # 配置r1的绑定地址
a1.sources.r1.port = 44444 # 配置r1的监听端口

# Describe the sink
a1.sinks.k1.type = logger # 配置k1的类型为logger,输出到控制台

# Use a channel which buffers events in memory
a1.channels.c1.type = memory # 配置c1的类型为memory
a1.channels.c1.capacity = 1000 # 配置c1的容量为1000个事件
a1.channels.c1.transactionCapacity = 100 # 配置c1的事务容量为100个事件

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 # 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1 # 配置k1的channel属性,指定k1连接到那个channel

部署运行flume监听端口

1
2
3
cd /opt/bunny/flume

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console

使用netcat工具向本机的44444端口发送内容.在Flume监听页面观察接收数据情况

1
nc localhost 44444

在此窗口输入内容,则监听页面会显示内容。

实时监控目录下的多个追加文件

使用哪种Source组件?
① Exec source:适用于监控一个实时追加的文件,不能实现断点续传;
② Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
③ Taildir Source:适合用于监听多个实时追加的文件,并且能够实现断点续传。

使用Flume监听整个目录的实时追加文件,并上传至HDFS.

在flume根目录下创建目录datas/tailCase/files和datas/tailCase/logs用于存放数据文件

1
2
cd /opt/bunny/flume
mkdir -p datas/tailCase/files datas/tailCase/logs

编写配置文件
在job/simpleCase目录下,创建配置文件flume-2-taildir-hdfs.conf

1
2
/opt/bunny/flume/job/simpleCase
vim flume-2-taildir-hdfs.conf

编辑如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/bunny/flume/tail_dir.json
a2.sources.r1.filegroups = f1 f2
a2.sources.r1.filegroups.f1 = /opt/bunny/flume/datas/tailCase/files/.*file.*
a2.sources.r1.filegroups.f2 = /opt/bunny/flume/datas/tailCase/logs/.*log.*

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h102:8020/flume/tailDir/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = tail-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位 second(秒)、minute(分钟)、hour(小时)
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream)
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

启动flume监控文件夹

1
2
cd /opt/bunny/flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simpleCase/flume-2-taildir-hdfs.conf

测试

在/opt/bunny/flume/datas/目录下创建tailCase/files文件夹向files文件夹下文件追加内容

1
2
3
4
5
cd /opt/bunny/flume/datas/tailCase/files
touch file1.txt
echo I am file1 >> file1.txt
touch log1.txt
echo I am log1 >> log1.txt

在/opt/module/flume/datas/目录下创建tailCase/logs文件夹向logs文件夹下文件追加内容

1
2
3
4
5
6
cd /opt/bunny/flume/datas/tailCase/logs

touch file2.txt
echo I am file2 >> file2.txt
touch log2.txt
echo I am log2 >> log2.txt

查看HDFS上的数据,验证flume对多目录下文件的实时采集.
http://192.168.93.102:9870/

关掉flume采集程序,对logs/和files/下文件追加,再开启flume采集程序,验证flume的断点续传
//关掉flume采集程序

1
2
3
4
5
cat /opt/bunny/flume/tail_dir.json       // 观察json文件
cd /opt/bunny/flume/datas/tailCase/files
echo I am file1 duandian >> file1.txt
cd /opt/bunny/flume/datas/tailCase/logs
echo I am log2 xuchuan>> log2.txt

单数据源多出口

使用Flume-1监控文件变动
·Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS
·同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

准备工作

在/opt/bunny/flume/job目录下创建enterprise/copy文件夹,存放复制案例的配置文件
mkdir -p /opt/bunny/flume/job/enterprise/copy

在/opt/bunny/flume/datas/目录下创建模拟日志文件realtime.log
touch /opt/bunny/flume/datas/realtime.log

配置文件

flume-1的agent配置文件flume-1-exec-avro.conf
其中配置1个source和两个channel、两个sink,分别输送给flume-2-avro-hdfs和flume-3-avro-file。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
cd /opt/bunny/flume/job/enterprise/copy
vim flume-1-exec-avro.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel,其实默认就是replicating
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/bunny/flume/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = h102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

编写flume-2的agent配置文件flume-2-avro-hdfs.conf,创建Flume-3的agent配置文件,创建flume-3-avro-file.conf,采集Flume-1的输出数据,输出到本地/opt/bunny/flume/datas/copy_result目录下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
vim flume-2-avro-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = h102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h102:8020/flume/copy/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = copy-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
vim flume-3-avro-file.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = h102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/bunny/flume/datas/copy_result

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

运行

创建本地目录

1
mkdir /opt/bunny/flume/datas/copy_result

输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

运行flume,开启对数据的监控采集:启动顺序是先下游,再上游

1
2
3
4
5
cd /opt/bunny/flume/
bin/flume-ng agent -c conf/ -n a3 -f /opt/bunny/flume/job/enterprise/copy/flume-3-avro-file.conf
bin/flume-ng agent -c conf/ -n a2 -f /opt/bunny/flume/job/enterprise/copy/flume-2-avro-hdfs.conf
bin/flume-ng agent -c conf/ -n a1 -f /opt/bunny/flume/job/enterprise/copy/flume-1-exec-avro.conf

向文件中追加内容,模拟日志实时更新

1
2
/opt/bunny/flume/datas/
echo 1999-10-31 09-09-09 >> realtime.log

检查HDFS上数据文件

检查 /opt/bunny/flume/datas/copy_result目录中数据

1
2
3
4
cd /opt/bunny/flume/datas/copy_result

ll

注意:file Sink采集数据到本地磁盘时,本地文件是按照时间滚动产生的,即使没有时间采集过来,本地也会生成空文件。

多路复用和拦截器的使用

使用flume采集服务器端口日志数据,需要按照日志类型的不同,将不同种类的日志发往不同分析系统。

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。
此时会用到Flume的channel selecter中的Multiplexing结构。
Multiplexing的原理是:根据event中Header的某个key的值,将不同的event发送到不同的Channel中,
自定义Interceptor:实现为不同类型的event的Header中的key赋予不同的值。

在该案例中,我们以端口数据模拟日志,以数字和字母模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。

创建maven项目兵打包传到flume

创建一个maven项目,并引入以下依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>

定义CustomInterceptor类并实现Interceptor接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package cn.kittybuny.bigdata;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;

public class CustomInterceptor implements Interceptor {


@Override
public void initialize() {
}

@Override
public Event intercept(Event event) {
// 1. 从事件中获取数据
byte[] body = event.getBody();
// 2. 判断数据开头的字符是字母还是数据
if (body[0] >= 'a' && body[0] <= 'z') {
event.getHeaders().put("type", "letter"); // 是字母就在事件头部设置type类型为letter
} else if (body[0] >= '0' && body[0] <= '9') {
event.getHeaders().put("type", "number"); // 是数字就在事件头部设置type类型为number
}
// 3. 返回事件
return event;

}
// 对批量事件进行拦截
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {
}
// 拦截器对象的构造对象
public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new CustomInterceptor();
}

@Override
public void configure(Context context) {
}
}
}

将项目打包,并导入到flume的lib目录下。

编辑配置文件

编辑flume-1配置文件
在h102上的/opt/module/flume/job/目录下创建文件夹/custom/multi,存放本案例配置文件
mkdir -p /opt/bunny/flume/job/custom/multi

为h102上的Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
vim /opt/bunny/flume/job/custom/multi/flume-1-netcat-avro.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.kittybuny.bigdata.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h103
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = h104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

编写flume-2的配置文件和flume-3的配置文件
分别为h103上的flume-2和h104上的flume-3配置一个avro source和一个logger sink。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vim /opt/bunny/flume/job/custom/multi/flume-2-avro-logger.conf

# agent
a2.sources=r1
a2.sinks = k1
a2.channels = c1

# source
a2.sources.r1.type = avro
a2.sources.r1.bind = h103
a2.sources.r1.port = 4141

# sink
a2.sinks.k1.type = logger

# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# bind
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vim /opt/bunny/flume/job/custom/multi/flume-3-avro-logger.conf

# agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source
a3.sources.r1.type = avro
a3.sources.r1.bind = h104
a3.sources.r1.port = 4242

# sink
a3.sinks.k1.type = logger

# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# bind
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

同步数据

1
xsync /opt/bunny/flume/

启动进程并验证

h103上的flume-2

1
2
cd /opt/bunny/flume/
bin/flume-ng agent -c conf/ -n a2 -f /opt/bunny/flume/job/custom/multi/flume-2-avro-logger.conf -Dflume.root.logger=INFO,console

h104上的flume-3

1
2
cd /opt/bunny/flume/
bin/flume-ng agent -c conf/ -n a3 -f /opt/bunny/flume/job/custom/multi/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console

h102上的flume-1

1
2
cd /opt/bunny/flume/
bin/flume-ng agent -c conf/ -n a1 -f /opt/bunny/flume/job/custom/multi/flume-1-netcat-avro.conf

h102

1
2
nc localhost 44444

再103和104上可以查看结果

聚合

h102上的flume-1监控文件/opt/bunny/flume/datas/.file.,
h103上的flume-2监控某一个端口的数据流,
h104上的flume-3,接收flume-1和flume-2的数据,flume-3将最终数据打印到控制台。

配置文件

1
mkdir /opt/bunny/flume/job/enterprise/juhe

flume-1配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
vim /opt/bunny/flume/job/enterprise/juhe/flume-1-exec-avro.conf 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/bunny/flume/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume-2配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
vim /opt/bunny/flume/job/enterprise/juhe/flume-2-netcat-avro.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = h103
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = h104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume-3配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vim /opt/bunny/flume/job/enterprise/juhe/flume-3-avro-logger.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = h104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

同步文件

1
xsync /opt/bunny/flume

运行验证

h104

1
/opt/bunny/flume/bin/flume-ng agent –c conf/ -n a3 -f /opt/bunny/flume/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console

h103

1
/opt/bunny/flume/bin/flume-ng agent –c conf/ -n a2 -f /opt/bunny/flume/job/enterprise/juhe/flume-2-netcat-avro.conf

h102

1
/opt/bunny/flume/bin/flume-ng agent –c conf/ -n a1 -f /opt/bunny/flume/job/enterprise/juhe/flume-1-exec-avro.conf

h102

1
echo 'hello' >> /opt/bunny/flume/datas/realtime.log

h103

1
nc h103 44444

在h104查看结果

数据流监控

文章目录
  1. 监控端口数据官方案例
  2. 实时监控目录下的多个追加文件
  3. 单数据源多出口
    1. 准备工作
    2. 配置文件
    3. 运行
  4. 多路复用和拦截器的使用
    1. 创建maven项目兵打包传到flume
    2. 编辑配置文件
    3. 启动进程并验证
  5. 聚合
    1. 配置文件
    2. 运行验证
  6. 数据流监控
|