Flume的Channel选择器
Flume的Channel选择器有Replicating Channel Selector (default)和Multiplexing Channel Selector,作用分别是复制和多路分发,默认用的复制
下面我们用两个案例分别实现Replicating Channel Selector和Multiplexing Channel Selector
Replicating Channel Selector
需要实现的功能
代码实现
# Name the components on this agent |
- 执行命令
flume-ng agent \ |
发送消息
telnet localhost 44444
结果:成功
Multiplexing Channel Selector
需要实现的功能
数据从Source到Channel中间会经过一个拦截器,拦截器中的Key和Value参数被添加到了所有的Event上,在经过选择器的时候,会根据拦截器中的Event所带的Value值的不同发送到不同的Sink
代码实现
nc-memory-arvo1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44441
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = aliyun
a1.sinks.k1.port = 55555
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = UA
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1nc-memory-arvo2.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44442
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = aliyun
a1.sinks.k1.port = 55555
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = UB
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1vim nc-memory-arvo3.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44443
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = aliyun
a1.sinks.k1.port = 55555
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = UC
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1vim avro-memory-multi.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 55555
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = logger
a1.sinks.k3.type = hdfs
a1.sinks.k3.hdfs.path = /flume/%y-%m-%d/%H/%M
a1.sinks.k3.hdfs.filePrefix = multiplexing-
a1.sinks.k3.hdfs.useLocalTimeStamp = true
a1.sinks.k3.hdfs.fileType=DataStream
a1.sinks.k3.hdfs.writeFormat=Text
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.UA = c1
a1.sources.r1.selector.mapping.UB = c2
a1.sources.r1.selector.default = c3
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c3.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3执行命令
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/nc-memory-arvo1.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/nc-memory-arvo2.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/nc-memory-arvo3.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/avro-memory-multi.conf \
-Dflume.root.logger=INFO,console发送消息
telnet aliyun 44441
telnet aliyun 44442
telnet aliyun 44443结果:成功
Flume的Sink选择器
Flume的Sink选择器常用的有两种:Failover Sink Processor和Load balancing Sink Processor
Failover Sink Processor
Failover Sink Processor可以在Agent中的Sink端做一个类似于灾备的Sink组,官方文档中介绍Failover Sink Processor维护一个按优先级排序的Sink列表,确保只要有一个可用的Sink,就会处理Event。
Failover Sink Processor的工作方式是将多个Sink组成Sinks组,他们有一个优先级的顺序关系,优先级大的先被激活。如果Sink在发送Event时失败,则下一个具有最高优先级的Sink将会用于发送Event。例如,优先级为100的接收器在优先级为80的接收器之前被激活。如果没有指定优先级,则根据在配置中指定Sink的顺序确定优先级。
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default |
The component type name, needs to be failover |
processor.priority. | – | Priority value. |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis) |
需要实现的功能
![Failover Sink Processor](https://yerias.github.io/flume_img/Failover Sink Processor.jpg)
Agent1发送Event,如果Sink组中的任意一个Sink接收Event失败,其他的Sink激活继续接收
代码实现
nc-memory-avro.conf
Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 44444
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = aliyun
a1.sinks.k1.port = 55551
Describe the sink
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = aliyun
a1.sinks.k2.port = 55552
Use a channel which buffers events in memory
a1.channels.c1.type = memory
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1avro1-memory-logger.conf
Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 55551
Describe the sink
a1.sinks.k1.type = logger
Use a channel which buffers events in memory
a1.channels.c1.type = memory
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1avro2-memory-logger.conf
Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 55552
Describe the sink
a1.sinks.k1.type = logger
Use a channel which buffers events in memory
a1.channels.c1.type = memory
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1执行命令
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/failover/nc-memory-avro.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/failover/avro1-memory-logger.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/failover/avro2-memory-logger.conf \
-Dflume.root.logger=INFO,console发送消息
telnet aliyun 44444
结果:成功
Load balancing Sink Processor
Load balancing Sink Processor提供了在多个Sink 上实现负载均衡的能力。它维护一个活动Sink的索引列表,发送的Event必须分布在这个列表上。实现支持通过round_robin或random分配负载。默认为round_robin类型,但是可以通过配置覆盖。自定义选择机制是通过继承AbstractSinkSelector的自定义类来支持的。
调用时,选择器使用其配置的选择机制选择下一个Sink 调用它。对于round_robin和random,如果选择的Sink 不能传递Event,处理器将通过其配置的选择机制选择下一个可用的Sink 。这种方法不会将失败的Sink加入黑名单,而是继续乐观地尝试每个可用的Sink。如果所有的Sink调用都导致失败,则整个程序运行失败
如果启用了backoff,Sink处理器将把失败的Sink列入黑名单,超过给定的时间后删除他们。当超时结束时,如果Sink仍然没有响应,超时将以指数方式增加,以避免在没有响应的Sink上陷入长时间的等待。禁用此功能后,在循环中,所有失败的Sink负载将按行传递到下一个Sink,因此不是均匀的。
Property Name | Default | Description |
---|---|---|
processor.sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default |
The component type name, needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off exponentially. |
processor.selector | round_robin |
Selection mechanism. Must be either round_robin , random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | Used by backoff selectors to limit exponential backoff (in milliseconds) |
需要实现的功能
![Load balancing Sink Processor](https://yerias.github.io/flume_img/Load balancing Sink Processor.jpg)
Agent1发送Event,Sink组中的每个Sink根据配置的选择器机制选择发送到哪一个Sink
代码实现
nc-memory-avro.conf
Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 44444
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = aliyun
a1.sinks.k1.port = 55551
Describe the sink
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = aliyun
a1.sinks.k2.port = 55552
Use a channel which buffers events in memory
a1.channels.c1.type = memory
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1avro1-memory-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 55551
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1avro2-memory-logger.conf
Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = aliyun
a1.sources.r1.port = 55552
Describe the sink
a1.sinks.k1.type = logger
Use a channel which buffers events in memory
a1.channels.c1.type = memory
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1执行命令
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/load_balancing/nc-memory-avro.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/load_balancing/avro1-memory-logger.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/load_balancing/avro2-memory-logger.conf \
-Dflume.root.logger=INFO,console发送消息
telnet aliyun 44444
结果:成功
Channel的两种类型
Channel有File和Memory两种类型,Memory的特点是使用内存,速度快,但是安全性没有保障;File的特点是数据都会写进文件,速度慢,但是安全性高。