重写InputFormat实现Hadoop多目录输出

接着上篇重写InputFormat实现Hadoop多目录输出

  1. 重写RecordWriter

    public class MultiFileRecordWriter extends RecordWriter<Text, NullWritable> {

    FSDataOutputStream jdOut = null;
    FSDataOutputStream otherOut = null;
    FileSystem fileSystem = null;

    public MultiFileRecordWriter(TaskAttemptContext context) {

    try {
    fileSystem = FileSystem.get(context.getConfiguration());
    jdOut = fileSystem.create(new Path("tunan-flink-2021/out/jd.txt"));
    otherOut = fileSystem.create(new Path("tunan-flink-2021/out/other.txt"));
    } catch (IOException e) {
    e.printStackTrace();
    }

    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
    if(key.toString().contains("jd")){
    jdOut.write((key.toString()+"\n").getBytes());
    }else{
    otherOut.write((key.toString()+"\n").getBytes());
    }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    IOUtils.closeStream(jdOut);
    IOUtils.closeStream(otherOut);
    }
    }
  2. 重写FileOutputFormat

    public class MultiFileOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

    return new MultiFileRecordWriter(context);
    }
    }
  3. Driver

    class MultiFileDriver extends Configured implements Tool {


    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    int run = ToolRunner.run(conf, new MultiFileDriver(), args);
    System.exit(run);
    }

    @Override
    public int run(String[] args) throws Exception {
    String in = "tunan-flink-2021/data/domain.txt";
    String out = "tunan-flink-2021/out";


    Configuration conf = super.getConf();

    Job job = Job.getInstance(conf);
    job.setJarByClass(MultiFileDriver.class);

    job.setMapperClass(MultiFileMapper.class);
    job.setReducerClass(MultiFileReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    job.setOutputFormatClass(MultiFileOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(in));
    FileOutputFormat.setOutputPath(job, new Path(out));

    boolean b = job.waitForCompletion(true);
    return b ? 0 : 1;
    }


    public static class MultiFileMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    context.write(value,NullWritable.get());
    }
    }

    public static class MultiFileReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    for (NullWritable value : values) {
    context.write(key, value);
    }
    }
    }
    }
Author: Tunan
Link: http://yerias.github.io/2021/06/20/hadoop/16/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.