

Flume的Channel选择器有Replicating Channel Selector (default)Multiplexing Channel Selector,作用分别是复制和多路分发,默认用的复制

下面我们用两个案例分别实现Replicating Channel Selector和Multiplexing Channel Selector

Replicating Channel Selector

  1. 需要实现的功能


  2. 代码实现

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 #两个sink
a1.channels = c1 c2 #两个channel

# Describe/configure the source
a1.sources.r1.type = netcat #nc输入
a1.sources.r1.bind =
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k2.type = logger #第一个sink

a1.sinks.k1.type = hdfs #第二个sink
a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H/%M
a1.sinks.k1.hdfs.filePrefix = replicating-
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sources.r1.selector.type = replicating #选择器

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #第一个memory

a1.channels.c2.type = memory #第二个memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2 //source连上两个channel
a1.sinks.k1.channel = c1 //channel1连上sink1
a1.sinks.k2.channel = c2 //channel2连上sink2
  1. 执行命令
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/nc-replicating-logger_and_hdfs.conf \
  1. 发送消息

    telnet localhost 44444


Multiplexing Channel Selector

  1. 需要实现的功能



  2. 代码实现


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind =
    a1.sources.r1.port = 44441

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UA

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind =
    a1.sources.r1.port = 44442

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UB

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    vim nc-memory-arvo3.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind =
    a1.sources.r1.port = 44443

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55555

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = state
    a1.sources.r1.interceptors.i1.value = UC

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    vim avro-memory-multi.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2 k3
    a1.channels = c1 c2 c3

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55555

    # Describe the sink
    a1.sinks.k1.type = logger

    a1.sinks.k2.type = logger

    a1.sinks.k3.type = hdfs
    a1.sinks.k3.hdfs.path = /flume/%y-%m-%d/%H/%M
    a1.sinks.k3.hdfs.filePrefix = multiplexing-
    a1.sinks.k3.hdfs.useLocalTimeStamp = true

    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state
    a1.sources.r1.selector.mapping.UA = c1
    a1.sources.r1.selector.mapping.UB = c2
    a1.sources.r1.selector.default = c3

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    a1.channels.c3.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2 c3
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    a1.sinks.k3.channel = c3
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo1.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo2.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/nc-memory-arvo3.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/avro-memory-multi.conf \
  4. 发送消息

    telnet aliyun 44441
    telnet aliyun 44442
    telnet aliyun 44443



Flume的Sink选择器常用的有两种:Failover Sink ProcessorLoad balancing Sink Processor

Failover Sink Processor

Failover Sink Processor可以在Agent中的Sink端做一个类似于灾备的Sink组,官方文档中介绍Failover Sink Processor维护一个按优先级排序的Sink列表,确保只要有一个可用的Sink,就会处理Event。

Failover Sink Processor的工作方式是将多个Sink组成Sinks组,他们有一个优先级的顺序关系,优先级大的先被激活。如果Sink在发送Event时失败,则下一个具有最高优先级的Sink将会用于发送Event。例如,优先级为100的接收器在优先级为80的接收器之前被激活。如果没有指定优先级,则根据在配置中指定Sink的顺序确定优先级。

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)
  1. 需要实现的功能

    ​ ![Failover Sink Processor](https://yerias.github.io/flume_img/Failover Sink Processor.jpg)


  2. 代码实现


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 44444

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55551

    # Describe the sink
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = aliyun
    a1.sinks.k2.port = 55552

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55551

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55552

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/nc-memory-avro.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/avro1-memory-logger.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/failover/avro2-memory-logger.conf \
  4. 发送消息

    telnet aliyun 44444


Load balancing Sink Processor

Load balancing Sink Processor提供了在多个Sink 上实现负载均衡的能力。它维护一个活动Sink的索引列表,发送的Event必须分布在这个列表上。实现支持通过round_robin或random分配负载。默认为round_robin类型,但是可以通过配置覆盖。自定义选择机制是通过继承AbstractSinkSelector的自定义类来支持的。

调用时,选择器使用其配置的选择机制选择下一个Sink 调用它。对于round_robin和random,如果选择的Sink 不能传递Event,处理器将通过其配置的选择机制选择下一个可用的Sink 。这种方法不会将失败的Sink加入黑名单,而是继续乐观地尝试每个可用的Sink。如果所有的Sink调用都导致失败,则整个程序运行失败


Property Name Default Description
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)
  1. 需要实现的功能

    ![Load balancing Sink Processor](https://yerias.github.io/flume_img/Load balancing Sink Processor.jpg)


  2. 代码实现


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 44444

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = aliyun
    a1.sinks.k1.port = 55551

    # Describe the sink
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = aliyun
    a1.sinks.k2.port = 55552

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55551

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = aliyun
    a1.sources.r1.port = 55552

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 执行命令

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/nc-memory-avro.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/avro1-memory-logger.conf \

    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/script/load_balancing/avro2-memory-logger.conf \
  4. 发送消息

    telnet aliyun 44444




Author: Tunan
Link: http://yerias.github.io/2018/12/03/flume/3/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.