Flume 安装与配置
建议直接下载 Flume 的预编译版本,下载地址 :这里
Component Interface | Type Alias | Implementation Class |
*.Channel | memory | *.channel.MemoryChannel |
*.Channel | jdbc | *.channel.jdbc.JdbcChannel |
*.Channel | file | *.channel.file.FileChannel |
*.Channel | – | *.channel.PseudoTxnMemoryChannel |
*.Channel | – | org.example.MyChannel |
*.Source | avro | *.source.AvroSource |
*.Source | netcat | *.source.NetcatSource |
*.Source | seq | *.source.SequenceGeneratorSource |
*.Source | exec | *.source.ExecSource |
*.Source | syslogtcp | *.source.SyslogTcpSource |
*.Source | multiport_syslogtcp | *.source.MultiportSyslogTCPSource |
*.Source | syslogudp | *.source.SyslogUDPSource |
*.Source | spooldir | *.source.SpoolDirectorySource |
*.Source | http | *.source.http.HTTPSource |
*.Source | thrift | *.source.ThriftSource |
*.Source | jms | *.source.jms.JMSSource |
*.Source | – | *.source.avroLegacy.AvroLegacySource |
*.Source | – | *.source.thriftLegacy.ThriftLegacySource |
*.Source | – | org.example.MySource |
*.Sink | null | *.sink.NullSink |
*.Sink | logger | *.sink.LoggerSink |
*.Sink | avro | *.sink.AvroSink |
*.Sink | hdfs | *.sink.hdfs.HDFSEventSink |
*.Sink | hbase | *.sink.hbase.HBaseSink |
*.Sink | asynchbase | *.sink.hbase.AsyncHBaseSink |
*.Sink | elasticsearch | *.sink.elasticsearch.ElasticSearchSink |
*.Sink | file_roll | *.sink.RollingFileSink |
*.Sink | irc | *.sink.irc.IRCSink |
*.Sink | thrift | *.sink.ThriftSink |
*.Sink | – | org.example.MySink |
*.ChannelSelector | replicating | *.channel.ReplicatingChannelSelector |
*.ChannelSelector | multiplexing | *.channel.MultiplexingChannelSelector |
*.ChannelSelector | – | org.example.MyChannelSelector |
*.SinkProcessor | default | *.sink.DefaultSinkProcessor |
*.SinkProcessor | failover | *.sink.FailoverSinkProcessor |
*.SinkProcessor | load_balance | *.sink.LoadBalancingSinkProcessor |
*.SinkProcessor | – | |
*.interceptor.Interceptor | timestamp | *.interceptor.TimestampInterceptor$Builder |
*.interceptor.Interceptor | host | *.interceptor.HostInterceptor$Builder |
*.interceptor.Interceptor | static | *.interceptor.StaticInterceptor$Builder |
*.interceptor.Interceptor | regex_filter | *.interceptor.RegexFilteringInterceptor$Builder |
*.interceptor.Interceptor | regex_extractor | *.interceptor.RegexFilteringInterceptor$Builder |
*.channel.file.encryption.KeyProvider$Builder | jceksfile | *.channel.file.encryption.JCEFileKeyProvider |
*.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
*.channel.file.encryption.CipherProvider | aesctrnopadding | *.channel.file.encryption.AESCTRNoPaddingProvider |
*.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
*.serialization.EventSerializer$Builder | text | *.serialization.BodyTextEventSerializer$Builder |
*.serialization.EventSerializer$Builder | avro_event | *.serialization.FlumeEventAvroEventSerializer$Builder |
*.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
Avro Source+Memory Channel+Logger Sink
使用 Avro Source 接收外部数据源,Logger 作为 sink,即通过 Avro RPC 调用,将数据缓存在 channel 中,然后通过 Logger 打印出调用发送的数据。配置 Agent,修改配置文件 conf/flume-conf.properties,内容如下:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1
首先,启动 Agent 进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动 Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
Avro Source+Memory Channel+HDFS Sink
配置 Agent,修改配置文件 conf/flume-conf-hdfs.properties,内容如下:
# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
首先,启动 Agent 进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动 Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
可以查看同步到 HDFS 上的数据:
hdfs dfs -ls /data/flume
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
Spooling Directory Source+Memory Channel+HDFS Sink
配置 Agent,修改配置文件 flume-conf-spool.properties,内容如下:
# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8
# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0
启动 Agent 进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1
可以查看 HDFS 上同步过来的数据:
hdfs dfs -ls /data/flume
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
Exec Source+Memory Channel+File Roll Sink
配置 Agent,修改配置文件 flume-conf-file.properties,内容如下:
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50
# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
启动 Agent 进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1
可以查看 File Roll Sink 对应的本地文件系统目录 /home/shirdrn/sink_data 下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
Architecture Overview: 架构概览
Flume NG 架构,如图所示:
- Event:一个数据单元,带有一个可选的消息头
- Flow:Event 从源点到达目的点的迁移的抽象
- Client:操作位于源点处的 Event,将其发送到 Flume Agent
- Agent:一个独立的 Flume 进程,包含组件 Source、Channel、Sink
- Source:用来消费传递到该组件的 Event
- Channel:中转 Event 的一个临时存储,保存有 Source 组件传递过来的 Event
- Sink:从 Channel 中读取并移除 Event,将 Event 传递到 Flow Pipeline 中的下一个 Agent(如果有的话)
外部系统产生日志,直接通过 Flume 的 Agent 的 Source 组件将事件(如日志行)发送到中间临时的 channel 组件,最后传递给 Sink 组件,HDFS Sink 组件可以直接把数据存储到 HDFS 集群上。一个最基本 Flow 的配置,格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
表示配置一个 Agent 的名称,一个 Agent 肯定有一个名称。<Source1> <Source2>
是 Agent 的 Source 组件的名称,消费传递过来的 Event。<Channel1> <Channel2>
是 Agent 的 Channel 组件的名称。<Sink1> <Sink2>
是 Agent 的 Sink 组件的名称,从 Channel 中消费(移除)Event。
上面配置内容中,第一组中配置 Source、Sink、Channel,它们的值可以有 1 个或者多个;第二组中配置 Source 将把数据存储(Put )到 哪一个 Channel 中,可以存储到 1 个或多个 Channel 中,同一个 Source 将数据存储到多个 Channel 中,实际上是 Replication;第三组中配置 Sink 从哪一个 Channel 中取(Task )数据,一个 Sink 只能从一个 Channel 中取数据。
Flow Pipeline
多个 Agent 顺序连接
可以将多个 Agent 顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent 的数量,因为数据流经的路径变长了,如果不考虑 failover 的话,出现故障将影响整个 Flow 上的 Agent 收集服务。
多个 Agent 的数据汇聚到同一个 Agent
这种情况应用的场景比较多,比如要收集 Web 网站的用户行为日志,Web 网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每 个节点都配置一个 Agent 来单独收集日志数据,然后多个 Agent 将数据最终汇聚到一个用来存储数据存储系统,如 HDFS 上。
多路(Multiplexing ) Agent
Replication 方式,可以将 最前端的数据源复制多份,分别传递到多个 channel 中,每个 channel 接收到的数据都是相同的,配置格式,如下所示:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
上面指定了 selector 的 type 的值为 replication,其他的配置没有指定,使用的 Replication 方式,Source1 会将数据分 别存储到 Channel1 和 Channel2,这两个 channel 里面存储的数据是相同的,然后数据被传递到 Sink1 和 Sink2。
Multiplexing 方式,selector 可以根据 header 的值来确定数据传递到哪一个 channel,配置格式,如下所示:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
<Agent>.sources.<Source1>.selector.default = <Channel2>
上面 selector 的 type 的值为 multiplexing,同时配置 selector 的 header 信息,还配置了多个 selector 的 mapping 的值,即 header 的值:如果 header 的值为 Value1、Value2,数据从 Source1 路由到 Channel1;如果 header 的值为 Value2、Value3,数据从 Source1 路由到 Channel2。
Load Balance: 负载均衡
Load balancing Sink Processor 能够实现 load balance 功能,上图 Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink 组件上,而每个 Sink 组件分别连 接到一个独立的 Agent 上,示例配置,如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
Failover Sink Processor 能够实现 failover 功能,具体流程类似 load balance,但是内部处理机制与 load balance 完全不同:Failover Sink Processor 维护一个优先级 Sink 组件列表,只要有一个 Sink 组件可用,Event 就被传递到下一个组件。如果一个 Sink 能够成功处理 Event,则会加入到一个 Pool 中,否则会被移出 Pool 并计算失败次数,设置一个惩罚因子,示例配置如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000