项目数据的采集

系统架构图:

数据仓库架构项目图

目标

Flume是用来做什么的

为什么要使用Flume

Flume具体怎么用

java客户端上传消息到Flume写到HDFS

解决Flume采集数据时生成大量小文件的问题

为什么要使用Flume

在开源框架的选择中,因为Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. 所以我们选择了Flume作为数据的采集工具

Flume是用来做什么的

从系统架构图上来看,用户只要产生行为,那么日志就会在Nginx服务器中保存,所以我们现在要做的就是把数据从Nginx服务器中使用Flume采集到HDFS上

Flume具体怎么用

Flume

Flume就是一个针对日志数据进行采集和汇总的一个框架

Flume的进程叫做Agent,每个Agent中有Srouce、Channel、Sink

Flume从使用层面来讲就是写配置文件,其实就是配置我们的Agent,只要学会从官网查配置就行了

Flume官网

Agent

Source中的常用方式有 avro、exec、spooling、taildir、kafka

Channel中的常用方式有 memory、kafka、file

Sink中的常用方式有 hdfs、logger、avro、kafka

示例配置

它描述了一个单节点Flume部署。该配置允许用户生成事件,然后将它们记录到控制台。以后写配置只需要在这个配置文件的基础上做出选择性的修改即可

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

有了这个配置文件,我们可以按如下方式启动Flume:

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/example.conf \
-Dflume.root.logger=INFO,console

输入端:

telnet localhost 44444

TAILDIR

taildir是Source端可以选择的一个类型,它可以同时支持目录和文件,并且支持offset,Flume挂了重启后可以接着上次消费的地方继续消费,下面提供一个配置taildir的文件

# Describe/configure the source
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /home/hadoop/taildir_position.json #这个目录的上一级目录不能存在
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/data/flume/example
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /home/hadoop/data/flume/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

其中taildir_position.json文件是用来记录文件读取offset的位置,方便下次继续从offset位置读取

注意taildir_position.json文件不能存在上级目录,不然会报错

java客户端上传消息到Flume写到HDFS

参考博客: https://blog.csdn.net/lbship/article/details/85336555

  1. 在java客户端编写代码生成logger数据

    添加依赖

    <dependency>
    <groupId>org.apache.flume.flume-ng-clients</groupId>
    <artifactId>flume-ng-log4jappender</artifactId>
    <version>1.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flume.flume-ng-clients</groupId>
    <artifactId>flume-ng-log4jappender</artifactId>
    <version>1.6.0</version>
    </dependency>

    生产日志

    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
    public static void main(String[] args) throws InterruptedException {
    int i = 0;
    while(i<99){
    Thread.sleep(500);
    logger.info("now is : "+i);
    i++;
    }
    }
  2. 在log4j.properties文件中添加代码

    log4j.rootCategory=info,console,flume  //rootCategory需要指定flume

    log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname=121.196.220.143
    log4j.appender.flume.Port=41414
    log4j.appender.flume.UnsafeMode=true
  3. 在Hostname服务端接收log4j.properties文件中指定的端口即可,注意Source类型需要使用avro(序列化)

    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
  4. 写出到HDFS,这里需要注意解决时间戳和乱码问题

    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H%M/%S #时间需要根据实际情况修改
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.useLocalTimeStamp=true #解决时间戳问题
    a1.sinks.k1.hdfs.fileType=DataStream #解决乱码

    使用useLocalTimeStamp=true参数解决时间戳异常,使用fileType=DataStream解决文件内容乱码问题

解决Flume采集数据时生成大量小文件的问题

在使用Flume采集数据时,由于默认参数的影响会生产大量小文件,我们先看默认参数

hdfs.rollInterval	30	滚动当前文件之前要等待的秒数
hdfs.rollSize 1024 触发滚动当前文件的大小,单位bytes(B)
hdfs.rollCount 10 触发滚动当前文件的events数量

我们看到默认生成文件有三个条件,每30秒、每1M、每10个events,这样的配置会生成大量的小文件,所以我们要对这三个文件进行修改

最终生成的文件必须综合时间、文件大小、event数量来决定,时间太长或者文件太大都不利于最终生成的文件。该时间还需要配合hdfs.path参数指定的生成文件时间。

注意sink.type如果是memory模式,注意文件的大小,防止内存不足,太大可以设置sink.type = file

Author: Tunan
Link: http://yerias.github.io/2020/02/16/offlinedw/3.%E9%A1%B9%E7%9B%AE%E6%95%B0%E6%8D%AE%E7%9A%84%E9%87%87%E9%9B%86/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.