Flume源代码二次开发Source&Sink&Interceptor&Channel的事物保证
  1. Agent架构
  2. 自定义Source
  3. 自定义Sink
  4. 自定义Interceptor
  5. Channel的事物保证

Agent架构

Agent

自定义Source

提示:当不会写的时候,看源码是个不错的选择

在自定义Flume的组件之前, IDEA需要引入Flume的依赖

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>

Source的目的是从外部客户端接收数据并将其存储到配置的Channels中。一个Source可以获得它自己的ChannelProcessor的一个实例来连续处理一个Event,该Event在一个Channel本地事务中被提交。

在异常的情况下,所需的Channel将传播异常,所有Channel将回滚它们的Event,但之前在其他Channel上处理的Event将保持提交。

自定义Source可以在数据源里直接产生数据,产生的数据你可以定制化(前缀、后缀)

自定义Source类,参考官网demo/github

package tunan.hadoop.flume.sources;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {

// 自定义属性,前缀和后缀
private String prefix;
private String suffix;

// 处理Event
@Override
public Status process() throws EventDeliveryException {
// 自定义状态属性
Status status = null;

// 模拟产生数据
for (int i = 0; i < 100; i++) {
// 创建Event
SimpleEvent event = new SimpleEvent();

// 把数据设置到Body中去,注意header为空
event.setBody((prefix + i + suffix).getBytes());
// 开始处理Evetn
getChannelProcessor().processEvent(event);
}

try {
// 成功
status = Status.READY;
} catch (Exception e) {
e.printStackTrace();
// 失败,回退
status = Status.BACKOFF;
}

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 返回状态的结果
return status;
}


// 获取Agent中传入的参数信息
@Override
public void configure(Context context) {
this.prefix = context.getString("prefix","TUNAN");
this.suffix = context.getString("suffix");
}
}

将jar包上传到$FLUME_HOME/lib下,并修改配置文件中的Source,修改Type、添加类中自定义的参数

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 我们在类中自定义的Source参数和类的全限定名
a1.sources.r1.type = tunan.hadoop.flume.sources.MySource
a1.sources.r1.prefix = tunan:
a1.sources.r1.suffix = -6639

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent \
--conf /home/hadoop/app/flume/conf \
--conf-file /home/hadoop/app/flume/script/MySource.conf \
--name a1 \
-Dflume.root.logger=INFO,console

查看结果

mysource

自定义Sink

提示:当不会写的时候,看源码是个不错的选择

Sink的目的是从Channel中提取Event并将它们转发到流中的下一个Flume Agent或将它们存储在外部存储库中。

正如在Flume属性文件中配置的那样,一个Sink仅与一个Channel相关联。

有一个与每个配置的Sink相关联的SinkRunner实例,当Flume框架调用SinkRunner.start()时,会创建一个新线程来驱动Sink(使用SinkRunner)。这个线程管理Sink的生命周期。Sink需要实现start()和stop()方法,它们是生命周期感知接口的一部分。start()方法应该初始化Sink,并使其处于可以将Event转发到下一个目的地的状态。process()方法应该执行从Channel中提取Event并转发Event的核心处理。stop()方法应该做必要的清理工作(例如释放资源)。

从Channel拿到数据(Event),把数据输出我们自定义的Sink中去,架构为:

nc source ==> memory channel ==> MySink

注意: nc可以保证消息有序,telnet不能保证消息有序

代码实现:

package tunan.hadoop.flume.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

// 拿到logger
private static final Logger logger = LoggerFactory.getLogger(MySink.class);

// 属性作为参数的前缀和后缀
private String prefix;
private String suffix;

// 从channel中获取数据发送到目的地
@Override
public Status process() throws EventDeliveryException {
Status status;

// 获取channel
Channel ch = getChannel();
// 获取事物
Transaction txn = ch.getTransaction();
// 开启事物
txn.begin();
try {
// 拿到event
Event event;

do { // 一直等待,直到拿到不为空的Event
event = ch.take();
} while ((event == null));

// body是个字节数组转换成字符串
String body = new String(event.getBody());

// 控制台打印
logger.error(prefix + body + suffix);

// 提交事务
txn.commit();
status = Status.READY;
} catch (Throwable t) {
// 回滚事务
txn.rollback();
status = Status.BACKOFF;

if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}

// 从Agent中拿到参数
@Override
public void configure(Context context) {
this.prefix = context.getString("prefix","Tunan");
this.suffix = context.getString("suffix");
}
}

jar包上传到$FLUME_HOME\lib下,并修改配置文件中的Sink,修改Type、添加类中自定义的参数

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# # 我们在类中自定义的Sink参数和类的全限定名
a1.sinks.k1.type = tunan.hadoop.flume.sink.MySink
a1.sinks.k1.prefix = tunan-sink:
a1.sinks.k1.suffix = -flie

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

发送消息

[hadoop@hadoop ~]$ nc localhost 44444
1
OK
2
OK
3
OK
..
0
OK

查看结果

自定义Interceptor

提示:当不会写的时候,看源码是个不错的选择

Flume具有修改/删除运行中的Event的能力。这是在拦截器的帮助下完成的。拦截器是实现org.apache.flum .interceptor. interceptor接口的类。拦截器可以根据拦截器开发人员选择的任何标准修改甚至删除Event。

Flume支持链式的拦截器。这是通过在配置中指定拦截器builder类名列表来实现的。截取程序被指定为source配置中的空白分隔列表。指定拦截器的顺序与调用它们的顺序相同。一个拦截器返回的Event列表被传递到链中的下一个拦截器。

拦截器可以修改或删除Event。如果拦截器需要删除Event,它只会在其返回的列表中不返回该Event。如果它要删除所有Event,那么它只返回一个空列表。

需求:Flume接进来的数据都在一起,有些业务线的数据比较重要,单独拉出来,这里自定义拦截器,并配合Multiplexing Channel Selector将body包含gifshow的数据单独拿出来

Flume自定义拦截器

代码实现自定义拦截器

package tunan.hadoop.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class MyInterceptor implements Interceptor {
// 自定义List用来处理批量Event
private List<Event> newEvents;

// 初始化设置
@Override
public void initialize()
{
newEvents = new ArrayList<>();
}

// 拦截单个Event
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody());
if (body.contains("gifshow")) {
// 自定义头信息
headers.put("type", "gifshow");
// 自定义头信息
} else {
headers.put("type", "other");
}

return event;
}

// 拦截多个Event处理
@Override
public List<Event> intercept(List<Event> events) {
// 每次进来初始化List
newEvents.clear();
Iterator<Event> iter = events.iterator();
while (iter.hasNext()){
Event next = iter.next();
// event传递给单个处理,并添加到新的List
newEvents.add(intercept(next));
}

// 返回拦截后的Event
return newEvents;
}

@Override
public void close() {

}
// 源码在 HostInterceptor,需要添加一个静态内部类,并且名为Builder
public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new MyInterceptor();
}

@Override
public void configure(Context context) {

}
}
}

配置第一个Agent

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = tunan.hadoop.flume.interceptor.MyIntercepto$Builder

# multiplexing selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
# 与自定义拦截器中设置的头信息对应
a1.sources.r1.selector.mapping.figshow = c1
a1.sources.r1.selector.mapping.other = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop
a1.sinks.k1.port = 4441

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop
a1.sinks.k2.port = 4442

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置第二个Agent

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 4441

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置第三个Agent

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 4442

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Agent,先启动Agent2和Agent3,不然会报错

flume-ng agent \
--conf /home/hadoop/app/flume/conf \
--conf-file /home/hadoop/app/flume/script/Interceptor_3.conf \
--name a1 \
-Dflume.root.logger=INFO,console

flume-ng agent \
--conf /home/hadoop/app/flume/conf \
--conf-file /home/hadoop/app/flume/script/Interceptor_2.conf \
--name a1 \
-Dflume.root.logger=INFO,console

flume-ng agent \
--conf /home/hadoop/app/flume/conf \
--conf-file /home/hadoop/app/flume/script/Interceptor_1.conf \
--name a1 \
-Dflume.root.logger=INFO,console

发送消息

[hadoop@hadoop ~]$ nc localhost 44444
gifshow
OK
aaaaa
OK
aaaagifshow
OK
figshow
OK

查看结果

Channel的事物保证

Transaction interface 是Flume可靠性的基础,所有主要组件(即Source、Sink、Channel)必须使用Flume Transaction。

Transaction 在连接Channel中实现,连接到Channel的每个Source和Sink都必须获得一个Transaction 对象。

Source使用ChannelProcessor来管理Transaction ,而Sinks 通过其配置的Channel显式地管理Transaction。

每个阶段的操作Event(将其放入Channel中)或提取Event(将其从Channel中取出)的操作在活动必须在Transaction中完成。

![Channel Trancastion](https://yerias.github.io/flume_img/Channel Trancastion.jpg)

所有的事物管理由MemoryChannel类来做,具体可以查看源码

Author: Tunan
Link: http://yerias.github.io/2018/12/05/flume/5/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.