Flume的Channel选择器&Flume的Sink选择器&Channel的两种类型

Flume的Channel选择器

Flume的Channel选择器有Replicating Channel Selector (default)Multiplexing Channel Selector,作用分别是复制和多路分发,默认用的复制

下面我们用两个案例分别实现Replicating Channel Selector和Multiplexing Channel Selector

Replicating Channel Selector

  1. 需要实现的功能

    ReplicatingChannel

  2. 代码实现

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 #两个sink
a1.channels = c1 c2 #两个channel

# Describe/configure the source
a1.sources.r1.type = netcat #nc输入
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k2.type = logger #第一个sink

a1.sinks.k1.type = hdfs #第二个sink
a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H/%M
a1.sinks.k1.hdfs.filePrefix = replicating-
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream

a1.sources.r1.selector.type = replicating #选择器

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #第一个memory

a1.channels.c2.type = memory #第二个memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2 //source连上两个channel
a1.sinks.k1.channel = c1 //channel1连上sink1
a1.sinks.k2.channel = c2 //channel2连上sink2
  1. 执行命令
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/nc-replicating-logger_and_hdfs.conf \
-Dflume.root.logger=INFO,console
  1. 发送消息

    telnet localhost 44444

    结果:成功

Multiplexing Channel Selector

  1. 需要实现的功能

    multiplexingChannel

    数据从Source到Channel中间会经过一个拦截器,拦截器中的Key和Value参数被添加到了所有的Event上,在经过选择器的时候,会根据拦截器中的Event所带的Value值的不同发送到不同的Sink

  2. 代码实现

    nc-memory-arvo1.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44441

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UA

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory


    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    nc-memory-arvo2.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44442

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UB

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory


    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    vim nc-memory-arvo3.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44443

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UC

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory


    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    vim avro-memory-multi.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2 k3
    a1.channels = c1 c2 c3

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55555

    # Describe the sink
    a1.sinks.k1.type = logger

    a1.sinks.k2.type = logger

    a1.sinks.k3.type = hdfs
    a1.sinks.k3.hdfs.path = /flume/%y-%m-%d/%H/%M
    a1.sinks.k3.hdfs.filePrefix = multiplexing-
    a1.sinks.k3.hdfs.useLocalTimeStamp = true
    a1.sinks.k3.hdfs.fileType=DataStream
    a1.sinks.k3.hdfs.writeFormat=Text

    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state
    a1.sources.r1.selector.mapping.UA = c1
    a1.sources.r1.selector.mapping.UB = c2
    a1.sources.r1.selector.default = c3

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    a1.channels.c3.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2 c3
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    a1.sinks.k3.channel = c3
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo1.conf \
    -Dflume.root.logger=INFO,console

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo2.conf \
    -Dflume.root.logger=INFO,console

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo3.conf \
    -Dflume.root.logger=INFO,console

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/avro-memory-multi.conf \
    -Dflume.root.logger=INFO,console
  4. 发送消息

    telnet aliyun 44441
    telnet aliyun 44442
    telnet aliyun 44443

    结果:成功

Flume的Sink选择器

Flume的Sink选择器常用的有两种:Failover Sink ProcessorLoad balancing Sink Processor

Failover Sink Processor

Failover Sink Processor可以在Agent中的Sink端做一个类似于灾备的Sink组,官方文档中介绍Failover Sink Processor维护一个按优先级排序的Sink列表,确保只要有一个可用的Sink,就会处理Event。

Failover Sink Processor的工作方式是将多个Sink组成Sinks组,他们有一个优先级的顺序关系,优先级大的先被激活。如果Sink在发送Event时失败,则下一个具有最高优先级的Sink将会用于发送Event。例如,优先级为100的接收器在优先级为80的接收器之前被激活。如果没有指定优先级,则根据在配置中指定Sink的顺序确定优先级。

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)
  1. 需要实现的功能

    ​ ![Failover Sink Processor](https://yerias.github.io/flume_img/Failover Sink Processor.jpg)

    Agent1发送Event,如果Sink组中的任意一个Sink接收Event失败,其他的Sink激活继续接收

  2. 代码实现

    nc-memory-avro.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 44444

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55551

    # Describe the sink
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = aliyun
    a1.sinks.k2.port = 55552

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

    avro1-memory-logger.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55551

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    avro2-memory-logger.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55552

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/nc-memory-avro.conf \
    -Dflume.root.logger=INFO,console

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/avro1-memory-logger.conf \
    -Dflume.root.logger=INFO,console


    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/avro2-memory-logger.conf \
    -Dflume.root.logger=INFO,console
  4. 发送消息

    telnet aliyun 44444

    结果:成功

Load balancing Sink Processor

Load balancing Sink Processor提供了在多个Sink 上实现负载均衡的能力。它维护一个活动Sink的索引列表,发送的Event必须分布在这个列表上。实现支持通过round_robin或random分配负载。默认为round_robin类型,但是可以通过配置覆盖。自定义选择机制是通过继承AbstractSinkSelector的自定义类来支持的。

调用时,选择器使用其配置的选择机制选择下一个Sink 调用它。对于round_robin和random,如果选择的Sink 不能传递Event,处理器将通过其配置的选择机制选择下一个可用的Sink 。这种方法不会将失败的Sink加入黑名单,而是继续乐观地尝试每个可用的Sink。如果所有的Sink调用都导致失败,则整个程序运行失败

如果启用了backoff,Sink处理器将把失败的Sink列入黑名单,超过给定的时间后删除他们。当超时结束时,如果Sink仍然没有响应,超时将以指数方式增加,以避免在没有响应的Sink上陷入长时间的等待。禁用此功能后,在循环中,所有失败的Sink负载将按行传递到下一个Sink,因此不是均匀的。

Property Name Default Description
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)
  1. 需要实现的功能

    ![Load balancing Sink Processor](https://yerias.github.io/flume_img/Load balancing Sink Processor.jpg)

    Agent1发送Event,Sink组中的每个Sink根据配置的选择器机制选择发送到哪一个Sink

  2. 代码实现

    nc-memory-avro.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 44444

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55551

    # Describe the sink
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = aliyun
    a1.sinks.k2.port = 55552

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

    avro1-memory-logger.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55551

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    avro2-memory-logger.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55552

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/nc-memory-avro.conf \
    -Dflume.root.logger=INFO,console

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/avro1-memory-logger.conf \
    -Dflume.root.logger=INFO,console


    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/avro2-memory-logger.conf \
    -Dflume.root.logger=INFO,console
  4. 发送消息

    telnet aliyun 44444

    结果:成功

Channel的两种类型

Channel有File和Memory两种类型,Memory的特点是使用内存,速度快,但是安全性没有保障;File的特点是数据都会写进文件,速度慢,但是安全性高。

Author: Tunan
Link: http://yerias.github.io/2018/12/03/flume/3/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.