Flume的source选哪个? taildir source首选!
断点还原 positionFile
可以记录偏移量
可配置文件组,里面使用正则表达式配置多个要监控的文件
这么好的taildir source有一点不完美,不能支持递归监控文件夹。
所以就只能修改源代码了,需要注意的是无论是Apache版本的还是CDH的都能够兼容使用,我这里使用的版本是flume-ng-1.6.0-cdh5.16.2,但是即使你使用Apache的版本编译源码,也是没问题的。
改源码,先读源码 Flume的taildir source启动会调用start()
方法作初始化,里面创建一个ReliableTaildirEventReader
,这里用到了建造者模式
@Override public synchronized void start () { logger.info("{} TaildirSource source starting with directory: {}" , getName(), filePaths); try { reader = new ReliableTaildirEventReader.Builder() .filePaths(filePaths) .headerTable(headerTable) .positionFilePath(positionFilePath) .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) .cachePatternMatching(cachePatternMatching) .recursive(isRecursive) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader" , e); } idleFileChecker = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("idleFileChecker" ).build()); idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); positionWriter = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("positionWriter" ).build()); positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); super .start(); logger.debug("TaildirSource started" ); sourceCounter.start(); }
taildir source属于PollableSource
public interface PollableSource extends Source { ...
这段注释的意思是PollableSource
是需要一个外部驱动去查看有没有需要消费的事件,从而拉取事件,讲白了就是定时拉取 。所以flume也不一定是真正实时的,只是隔一会儿不停地来查看事件而已。(与之相应的是另一种EventDrivenSourceRunner
) 那么taildir source在定时拉取事件的时候是调用的process
方法
@Override public Status process () { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true ); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping" ); } } catch (Throwable t) { logger.error("Unable to tail files" , t); status = Status.BACKOFF; } return status; }
重点就是下面这几行
existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true ); } }
从reader.updateTailFiles()
获取需要监控的文件,然后对每一个进行处理,查看最后修改时间,判定是否需要tail
,需要tail
就tail
那么进入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) { Map<String, String> headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { long inode = getInode(f); TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0 ; tf = openFile(f, headers, inode, startPos);
遍历每一个正则表达式匹配对应的匹配器,每个匹配器去获取匹配的文件!taildir.getMatchingFiles()
List<File> getMatchingFiles () { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); List<File> result; if (!cachePatternMatching || lastSeenParentDirMTime < currentParentDirMTime || !(currentParentDirMTime < lastCheckedTime)) { lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive)); lastSeenParentDirMTime = currentParentDirMTime; lastCheckedTime = now; } return lastMatchedFiles; }
可以看到getMatchingFilesNoCache(isRecursive)
就是获取匹配的文件的方法,也就是需要修改的方法了! ps:这里的isRecursive
是我加的~ 点进去:
private List<File> getMatchingFilesNoCache () { List<File> result = Lists.newArrayList(); try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) { for (Path entry : stream) { result.add(entry.toFile()); } } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. " + parentDir.toPath(), e); } return result; }
源码是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter))
,将父目录下符合正则表达式的文件都添加到一个迭代器里。(这里还用了try (...)
的语法糖)
找到地方了,开始改! 我在这个getMatchingFilesNoCache()
方法下面下了一个重载的方法, 可增加扩展性:
private List<File> getMatchingFilesNoCache (boolean recursion) { if (!recursion) { return getMatchingFilesNoCache(); } List<File> result = Lists.newArrayList(); Queue<File> dirs = new ArrayBlockingQueue<>(10 ); dirs.offer(parentDir); while (dirs.size() > 0 ) { File dir = dirs.poll(); try { DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath(), fileFilter); stream.forEach(path -> result.add(path.toFile())); } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. (recursion)" + parentDir.toPath(), e); } File[] dirList = dir.listFiles(); assert dirList != null ; for (File f : dirList) { if (f.isDirectory()) { dirs.add(f); } } } return result; }
我使用了非递归的方式遍历文件夹,就是树到队列的转换。 到这里,核心部分就改完了。接下来要处理这个recursion
的参数
华丽的分割线后,顺腾摸瓜! 一路改构造方法,添加这个参数,最终参数从哪来呢? flume的source启动时会调用configure
方法,将Context
中的内容配置进reader
等对象中。isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
context
从TaildirSourceConfigurationConstants
中获取配置名和默认值
public static final String RECURSIVE = "recursive" ;public static final boolean DEFAULT_RECURSIVE = false ;
这里的recursive
也就是flume配置文件里配置项了
# Whether to support recusion a1.sources.r1.recursive = true
大功告成,打包试试! 执行package将其放在flume的lib下,替换原来的flume-taildir-source***.jar
启动,测试,成功!
a1.sources.r1.type = TAILDIR a1.sources.r1.cachePatternMatching = false a1.sources.r1.positionFile = /home/hadoop/app/flume/position/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/hadoop/data/taildir/.*txt a1.sources.r1.recursive = false
具体代码见GitHub地址:https://github.com/yerias/recursion-flume-taildir