安装与配置

Flume 安装与配置

建议直接下载 Flume 的预编译版本,下载地址 :这里

Component Interface Type Alias Implementation Class
*.Channel memory *.channel.MemoryChannel
*.Channel jdbc *.channel.jdbc.JdbcChannel
*.Channel file *.channel.file.FileChannel
*.Channel *.channel.PseudoTxnMemoryChannel
*.Channel org.example.MyChannel
*.Source avro *.source.AvroSource
*.Source netcat *.source.NetcatSource
*.Source seq *.source.SequenceGeneratorSource
*.Source exec *.source.ExecSource
*.Source syslogtcp *.source.SyslogTcpSource
*.Source multiport_syslogtcp *.source.MultiportSyslogTCPSource
*.Source syslogudp *.source.SyslogUDPSource
*.Source spooldir *.source.SpoolDirectorySource
*.Source http *.source.http.HTTPSource
*.Source thrift *.source.ThriftSource
*.Source jms *.source.jms.JMSSource
*.Source *.source.avroLegacy.AvroLegacySource
*.Source *.source.thriftLegacy.ThriftLegacySource
*.Source org.example.MySource
*.Sink null *.sink.NullSink
*.Sink logger *.sink.LoggerSink
*.Sink avro *.sink.AvroSink
*.Sink hdfs *.sink.hdfs.HDFSEventSink
*.Sink hbase *.sink.hbase.HBaseSink
*.Sink asynchbase *.sink.hbase.AsyncHBaseSink
*.Sink elasticsearch *.sink.elasticsearch.ElasticSearchSink
*.Sink file_roll *.sink.RollingFileSink
*.Sink irc *.sink.irc.IRCSink
*.Sink thrift *.sink.ThriftSink
*.Sink org.example.MySink
*.ChannelSelector replicating *.channel.ReplicatingChannelSelector
*.ChannelSelector multiplexing *.channel.MultiplexingChannelSelector
*.ChannelSelector org.example.MyChannelSelector
*.SinkProcessor default *.sink.DefaultSinkProcessor
*.SinkProcessor failover *.sink.FailoverSinkProcessor
*.SinkProcessor load_balance *.sink.LoadBalancingSinkProcessor
*.SinkProcessor
*.interceptor.Interceptor timestamp *.interceptor.TimestampInterceptor$Builder
*.interceptor.Interceptor host *.interceptor.HostInterceptor$Builder
*.interceptor.Interceptor static *.interceptor.StaticInterceptor$Builder
*.interceptor.Interceptor regex_filter *.interceptor.RegexFilteringInterceptor$Builder
*.interceptor.Interceptor regex_extractor *.interceptor.RegexFilteringInterceptor$Builder
*.channel.file.encryption.KeyProvider$Builder jceksfile *.channel.file.encryption.JCEFileKeyProvider
*.channel.file.encryption.KeyProvider$Builder org.example.MyKeyProvider
*.channel.file.encryption.CipherProvider aesctrnopadding *.channel.file.encryption.AESCTRNoPaddingProvider
*.channel.file.encryption.CipherProvider org.example.MyCipherProvider
*.serialization.EventSerializer$Builder text *.serialization.BodyTextEventSerializer$Builder
*.serialization.EventSerializer$Builder avro_event *.serialization.FlumeEventAvroEventSerializer$Builder
*.serialization.EventSerializer$Builder org.example.MyEventSerializer$Builder

Avro Source+Memory Channel+Logger Sink

使用 Avro Source 接收外部数据源,Logger 作为 sink,即通过 Avro RPC 调用,将数据缓存在 channel 中,然后通过 Logger 打印出调用发送的数据。配置 Agent,修改配置文件 conf/flume-conf.properties,内容如下:

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1

首先,启动 Agent 进程:

bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1

然后,启动 Avro Client,发送数据:

bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

Avro Source+Memory Channel+HDFS Sink

配置 Agent,修改配置文件 conf/flume-conf-hdfs.properties,内容如下:

# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT

首先,启动 Agent 进程:

bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1

然后,启动 Avro Client,发送数据:

bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

可以查看同步到 HDFS 上的数据:

hdfs dfs -ls /data/flume

结果示例,如下所示:

-rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r--   3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r--   3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log

Spooling Directory Source+Memory Channel+HDFS Sink

配置 Agent,修改配置文件 flume-conf-spool.properties,内容如下:

# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8

# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0

启动 Agent 进程,执行如下命令:

bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1

可以查看 HDFS 上同步过来的数据:

hdfs dfs -ls /data/flume

结果示例,如下所示:

-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r--   3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log

Exec Source+Memory Channel+File Roll Sink

配置 Agent,修改配置文件 flume-conf-file.properties,内容如下:

# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50

# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data

启动 Agent 进程,执行如下命令:

bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1

可以查看 File Roll Sink 对应的本地文件系统目录 /home/shirdrn/sink_data 下,示例如下所示:

-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn        0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5

Architecture Overview: 架构概览

Flume NG 架构,如图所示:

主要有一下几个核心概念:

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event 从源点到达目的点的迁移的抽象
  • Client:操作位于源点处的 Event,将其发送到 Flume Agent
  • Agent:一个独立的 Flume 进程,包含组件 Source、Channel、Sink
  • Source:用来消费传递到该组件的 Event
  • Channel:中转 Event 的一个临时存储,保存有 Source 组件传递过来的 Event
  • Sink:从 Channel 中读取并移除 Event,将 Event 传递到 Flow Pipeline 中的下一个 Agent(如果有的话)

外部系统产生日志,直接通过 Flume 的 Agent 的 Source 组件将事件(如日志行)发送到中间临时的 channel 组件,最后传递给 Sink 组件,HDFS Sink 组件可以直接把数据存储到 HDFS 集群上。一个最基本 Flow 的配置,格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:

  • <Agent> 表示配置一个 Agent 的名称,一个 Agent 肯定有一个名称。
  • <Source1> <Source2> 是 Agent 的 Source 组件的名称,消费传递过来的 Event。
  • <Channel1> <Channel2> 是 Agent 的 Channel 组件的名称。
  • <Sink1> <Sink2> 是 Agent 的 Sink 组件的名称,从 Channel 中消费(移除)Event。

上面配置内容中,第一组中配置 Source、Sink、Channel,它们的值可以有 1 个或者多个;第二组中配置 Source 将把数据存储(Put )到 哪一个 Channel 中,可以存储到 1 个或多个 Channel 中,同一个 Source 将数据存储到多个 Channel 中,实际上是 Replication;第三组中配置 Sink 从哪一个 Channel 中取(Task )数据,一个 Sink 只能从一个 Channel 中取数据。

Flow Pipeline

多个 Agent 顺序连接

可以将多个 Agent 顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent 的数量,因为数据流经的路径变长了,如果不考虑 failover 的话,出现故障将影响整个 Flow 上的 Agent 收集服务。

多个 Agent 的数据汇聚到同一个 Agent

这种情况应用的场景比较多,比如要收集 Web 网站的用户行为日志,Web 网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每 个节点都配置一个 Agent 来单独收集日志数据,然后多个 Agent 将数据最终汇聚到一个用来存储数据存储系统,如 HDFS 上。

多路(Multiplexing ) Agent

Replication

Replication 方式,可以将 最前端的数据源复制多份,分别传递到多个 channel 中,每个 channel 接收到的数据都是相同的,配置格式,如下所示:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

上面指定了 selector 的 type 的值为 replication,其他的配置没有指定,使用的 Replication 方式,Source1 会将数据分 别存储到 Channel1 和 Channel2,这两个 channel 里面存储的数据是相同的,然后数据被传递到 Sink1 和 Sink2。

Multiplexing

Multiplexing 方式,selector 可以根据 header 的值来确定数据传递到哪一个 channel,配置格式,如下所示:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

上面 selector 的 type 的值为 multiplexing,同时配置 selector 的 header 信息,还配置了多个 selector 的 mapping 的值,即 header 的值:如果 header 的值为 Value1、Value2,数据从 Source1 路由到 Channel1;如果 header 的值为 Value2、Value3,数据从 Source1 路由到 Channel2。

Load Balance: 负载均衡

Load balancing Sink Processor 能够实现 load balance 功能,上图 Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink 组件上,而每个 Sink 组件分别连 接到一个独立的 Agent 上,示例配置,如下所示:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

Failover Sink Processor 能够实现 failover 功能,具体流程类似 load balance,但是内部处理机制与 load balance 完全不同:Failover Sink Processor 维护一个优先级 Sink 组件列表,只要有一个 Sink 组件可用,Event 就被传递到下一个组件。如果一个 Sink 能够成功处理 Event,则会加入到一个 Pool 中,否则会被移出 Pool 并计算失败次数,设置一个惩罚因子,示例配置如下所示:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000
下一页