https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#setting-multi-agent-flow
监控端口数据官方案例 使用Flume监听一个端口,收集该端口数据,并打印到控制台。
在flume目录下创建job文件夹并进入job文件夹。
1 2 mkdir -p /opt/bunny/flume/job/simpleCase cd /opt/bunny/flume/job/simpleCase
编写配置文件(注释去掉!!!):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 vim flume-1-netcat-logger.conf #Name the components on this agent a1.sources = r1 # 为a1的Source组件命名为r1,多个组件用空格间隔 a1.sinks = k1 # 为a1的Sink组件命名为k1,多个组件用空格间隔 a1.channels = c1 # 为a1的Channel组件命名为c1,多个组件用空格间隔 # Describe/configure the source a1.sources.r1.type = netcat # 配置r1的类型 a1.sources.r1.bind = h102 # 配置r1的绑定地址 a1.sources.r1.port = 44444 # 配置r1的监听端口 # Describe the sink a1.sinks.k1.type = logger # 配置k1的类型为logger,输出到控制台 # Use a channel which buffers events in memory a1.channels.c1.type = memory # 配置c1的类型为memory a1.channels.c1.capacity = 1000 # 配置c1的容量为1000个事件 a1.channels.c1.transactionCapacity = 100 # 配置c1的事务容量为100个事件 # Bind the source and sink to the channel a1.sources.r1.channels = c1 # 配置r1的channel属性,指定r1连接到那个channel a1.sinks.k1.channel = c1 # 配置k1的channel属性,指定k1连接到那个channel
部署运行flume监听端口
1 2 3 cd /opt/bunny/flume bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
使用netcat工具向本机的44444端口发送内容.在Flume监听页面观察接收数据情况
在此窗口输入内容,则监听页面会显示内容。
实时监控目录下的多个追加文件 使用哪种Source组件? ① Exec source:适用于监控一个实时追加的文件,不能实现断点续传; ② Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步; ③ Taildir Source:适合用于监听多个实时追加的文件,并且能够实现断点续传。
使用Flume监听整个目录的实时追加文件,并上传至HDFS.
在flume根目录下创建目录datas/tailCase/files和datas/tailCase/logs用于存放数据文件
1 2 cd /opt/bunny/flume mkdir -p datas/tailCase/files datas/tailCase/logs
编写配置文件 在job/simpleCase目录下,创建配置文件flume-2-taildir-hdfs.conf
1 2 /opt/bunny/flume/job/simpleCase vim flume-2-taildir-hdfs.conf
编辑如下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = TAILDIR a2.sources.r1.positionFile = /opt/bunny/flume/tail_dir.json a2.sources.r1.filegroups = f1 f2 a2.sources.r1.filegroups.f1 = /opt/bunny/flume/datas/tailCase/files/.*file.* a2.sources.r1.filegroups.f2 = /opt/bunny/flume/datas/tailCase/logs/.*log.* # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://h102:8020/flume/tailDir/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = tail- # 是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true # 多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 # 重新定义时间单位 second(秒)、minute(分钟)、hour(小时) a2.sinks.k1.hdfs.roundUnit = hour # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 # 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream) a2.sinks.k1.hdfs.fileType = DataStream # 多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 60 # 设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 # 文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
启动flume监控文件夹
1 2 cd /opt/bunny/flume bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simpleCase/flume-2-taildir-hdfs.conf
测试
在/opt/bunny/flume/datas/目录下创建tailCase/files文件夹向files文件夹下文件追加内容
1 2 3 4 5 cd /opt/bunny/flume/datas/tailCase/files touch file1.txt echo I am file1 >> file1.txt touch log1.txt echo I am log1 >> log1.txt
在/opt/module/flume/datas/目录下创建tailCase/logs文件夹向logs文件夹下文件追加内容
1 2 3 4 5 6 cd /opt/bunny/flume/datas/tailCase/logs touch file2.txt echo I am file2 >> file2.txt touch log2.txt echo I am log2 >> log2.txt
查看HDFS上的数据,验证flume对多目录下文件的实时采集.http://192.168.93.102:9870/
关掉flume采集程序,对logs/和files/下文件追加,再开启flume采集程序,验证flume的断点续传 //关掉flume采集程序
1 2 3 4 5 cat /opt/bunny/flume/tail_dir.json // 观察json文件 cd /opt/bunny/flume/datas/tailCase/files echo I am file1 duandian >> file1.txt cd /opt/bunny/flume/datas/tailCase/logs echo I am log2 xuchuan>> log2.txt
单数据源多出口 使用Flume-1监控文件变动 ·Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS ·同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
准备工作 在/opt/bunny/flume/job目录下创建enterprise/copy文件夹,存放复制案例的配置文件mkdir -p /opt/bunny/flume/job/enterprise/copy
在/opt/bunny/flume/datas/目录下创建模拟日志文件realtime.log touch /opt/bunny/flume/datas/realtime.log
配置文件 flume-1的agent配置文件flume-1-exec-avro.conf 其中配置1个source和两个channel、两个sink,分别输送给flume-2-avro-hdfs和flume-3-avro-file。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 cd /opt/bunny/flume/job/enterprise/copy vim flume-1-exec-avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给所有channel,其实默认就是replicating a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/bunny/flume/datas/realtime.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink端的avro是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = h102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = h102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
编写flume-2的agent配置文件flume-2-avro-hdfs.conf,创建Flume-3的agent配置文件,创建flume-3-avro-file.conf,采集Flume-1的输出数据,输出到本地/opt/bunny/flume/datas/copy_result目录下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 vim flume-2-avro-hdfs.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source端的avro是一个数据接收服务 a2.sources.r1.type = avro a2.sources.r1.bind = h102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://h102:8020/flume/copy/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = copy- # 是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true # 多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 # 重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 # 设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream # 多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 60 # 设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 # 文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 vim flume-3-avro-file.conf # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = h102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/bunny/flume/datas/copy_result # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
运行 创建本地目录
1 mkdir /opt/bunny/flume/datas/copy_result
输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
运行flume,开启对数据的监控采集:启动顺序是先下游,再上游
1 2 3 4 5 cd /opt/bunny/flume/ bin/flume-ng agent -c conf/ -n a3 -f /opt/bunny/flume/job/enterprise/copy/flume-3-avro-file.conf bin/flume-ng agent -c conf/ -n a2 -f /opt/bunny/flume/job/enterprise/copy/flume-2-avro-hdfs.conf bin/flume-ng agent -c conf/ -n a1 -f /opt/bunny/flume/job/enterprise/copy/flume-1-exec-avro.conf
向文件中追加内容,模拟日志实时更新
1 2 /opt/bunny/flume/datas/ echo 1999-10-31 09-09-09 >> realtime.log
检查HDFS上数据文件
检查 /opt/bunny/flume/datas/copy_result目录中数据
1 2 3 4 cd /opt/bunny/flume/datas/copy_result ll
注意:file Sink采集数据到本地磁盘时,本地文件是按照时间滚动产生的,即使没有时间采集过来,本地也会生成空文件。
多路复用和拦截器的使用 使用flume采集服务器端口日志数据,需要按照日志类型的不同,将不同种类的日志发往不同分析系统。
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。 此时会用到Flume的channel selecter中的Multiplexing结构。 Multiplexing的原理是:根据event中Header的某个key的值,将不同的event发送到不同的Channel中, 自定义Interceptor:实现为不同类型的event的Header中的key赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字和字母模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
创建maven项目兵打包传到flume 创建一个maven项目,并引入以下依赖
1 2 3 4 5 <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
定义CustomInterceptor类并实现Interceptor接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package cn.kittybuny.bigdata; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class CustomInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1. 从事件中获取数据 byte[] body = event.getBody(); // 2. 判断数据开头的字符是字母还是数据 if (body[0] >= 'a' && body[0] <= 'z') { event.getHeaders().put("type", "letter"); // 是字母就在事件头部设置type类型为letter } else if (body[0] >= '0' && body[0] <= '9') { event.getHeaders().put("type", "number"); // 是数字就在事件头部设置type类型为number } // 3. 返回事件 return event; } // 对批量事件进行拦截 @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } // 拦截器对象的构造对象 public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
将项目打包,并导入到flume的lib目录下。
编辑配置文件 编辑flume-1配置文件 在h102上的/opt/module/flume/job/目录下创建文件夹/custom/multi,存放本案例配置文件mkdir -p /opt/bunny/flume/job/custom/multi
为h102上的Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 vim /opt/bunny/flume/job/custom/multi/flume-1-netcat-avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.kittybuny.bigdata.CustomInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = h103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = h104 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
编写flume-2的配置文件和flume-3的配置文件 分别为h103上的flume-2和h104上的flume-3配置一个avro source和一个logger sink。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 vim /opt/bunny/flume/job/custom/multi/flume-2-avro-logger.conf # agent a2.sources=r1 a2.sinks = k1 a2.channels = c1 # source a2.sources.r1.type = avro a2.sources.r1.bind = h103 a2.sources.r1.port = 4141 # sink a2.sinks.k1.type = logger # Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # bind a2.sinks.k1.channel = c1 a2.sources.r1.channels = c1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 vim /opt/bunny/flume/job/custom/multi/flume-3-avro-logger.conf # agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # source a3.sources.r1.type = avro a3.sources.r1.bind = h104 a3.sources.r1.port = 4242 # sink a3.sinks.k1.type = logger # Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # bind a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1
同步数据
启动进程并验证 h103上的flume-2
1 2 cd /opt/bunny/flume/ bin/flume-ng agent -c conf/ -n a2 -f /opt/bunny/flume/job/custom/multi/flume-2-avro-logger.conf -Dflume.root.logger=INFO,console
h104上的flume-3
1 2 cd /opt/bunny/flume/ bin/flume-ng agent -c conf/ -n a3 -f /opt/bunny/flume/job/custom/multi/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console
h102上的flume-1
1 2 cd /opt/bunny/flume/ bin/flume-ng agent -c conf/ -n a1 -f /opt/bunny/flume/job/custom/multi/flume-1-netcat-avro.conf
h102
再103和104上可以查看结果
聚合 h102上的flume-1监控文件/opt/bunny/flume/datas/.file ., h103上的flume-2监控某一个端口的数据流, h104上的flume-3,接收flume-1和flume-2的数据,flume-3将最终数据打印到控制台。
配置文件 1 mkdir /opt/bunny/flume/job/enterprise/juhe
flume-1配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 vim /opt/bunny/flume/job/enterprise/juhe/flume-1-exec-avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/bunny/flume/datas/realtime.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = h104 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume-2配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 vim /opt/bunny/flume/job/enterprise/juhe/flume-2-netcat-avro.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = h103 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = h104 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume-3配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 vim /opt/bunny/flume/job/enterprise/juhe/flume-3-avro-logger.conf # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = h104 a3.sources.r1.port = 4141 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
同步文件
运行验证 h104
1 /opt/bunny/flume/bin/flume-ng agent –c conf/ -n a3 -f /opt/bunny/flume/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console
h103
1 /opt/bunny/flume/bin/flume-ng agent –c conf/ -n a2 -f /opt/bunny/flume/job/enterprise/juhe/flume-2-netcat-avro.conf
h102
1 /opt/bunny/flume/bin/flume-ng agent –c conf/ -n a1 -f /opt/bunny/flume/job/enterprise/juhe/flume-1-exec-avro.conf
h102
1 echo 'hello' >> /opt/bunny/flume/datas/realtime.log
h103
在h104查看结果
数据流监控