接着上篇重写InputFormat实现Hadoop多目录输出
重写RecordWriter
public class MultiFileRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream jdOut = null;
FSDataOutputStream otherOut = null;
FileSystem fileSystem = null;
public MultiFileRecordWriter(TaskAttemptContext context) {
try {
fileSystem = FileSystem.get(context.getConfiguration());
jdOut = fileSystem.create(new Path("tunan-flink-2021/out/jd.txt"));
otherOut = fileSystem.create(new Path("tunan-flink-2021/out/other.txt"));
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
if(key.toString().contains("jd")){
jdOut.write((key.toString()+"\n").getBytes());
}else{
otherOut.write((key.toString()+"\n").getBytes());
}
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(jdOut);
IOUtils.closeStream(otherOut);
}
}重写FileOutputFormat
public class MultiFileOutputFormat extends FileOutputFormat<Text, NullWritable> {
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new MultiFileRecordWriter(context);
}
}Driver
class MultiFileDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int run = ToolRunner.run(conf, new MultiFileDriver(), args);
System.exit(run);
}
public int run(String[] args) throws Exception {
String in = "tunan-flink-2021/data/domain.txt";
String out = "tunan-flink-2021/out";
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(MultiFileDriver.class);
job.setMapperClass(MultiFileMapper.class);
job.setReducerClass(MultiFileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(MultiFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static class MultiFileMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
public static class MultiFileReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, value);
}
}
}
}