2013年1月9日星期三

hadoop求文件交集

一,理论

   neil同学昨天抛给我一个小问题:利用hadoop从2个文件中提取出相同的条目。文件格式如下:
input1.txt
aaaa
bbbb
cc
11

input2.txt
aaa
bbbb
ccc
22



也就是求文件交集,重复的输出一次即可。其实如果对java和hadoop比较熟悉的话,这个问题应该还是有不少解法的,笔者2方面都不熟,这里只提出一种思路。
   首先将还在本地的文件进行归并(《hadoop in action》有归并的例子),归并后直接上传到hdfs。不过由于要求2个文件的交集,所以归并过程中要进行一些标记,方便统计的时候进行识别,标记方法糙快猛,直接在将文件写到hdfs的时候加入我们的标记。注意我们处理数据是按行来的。但是hadoop提供的文件流FSDataInputStream不支持按行读取,来,跟我一起递归上溯其父类,直到挖出支持行读入的api,找啊找:
FSDataInputStream -> DataInputStream,结果瞬间发现其父类DataInputStream有readLine()方法,但是,它被标记为Deprecated了,通过说明得知:
readLine()           Deprecated. This method does not properly convert bytes to characters. As of JDK 1.1, the preferred way to read lines of text is via the BufferedReader.readLine() method. Programs that use theDataInputStream class to read lines can be converted to use the BufferedReader class by replacing code of the form:
     DataInputStream d = new DataInputStream(in);
 
with:
     BufferedReader d
          = new BufferedReader(new InputStreamReader(in));
DataInputStream d = new DataInputStream(in);
with: BufferedReader d
= new BufferedReader(new InputStreamReader(in));
那我们就去找BufferedReader类,它需要一个InputStreamReader的对象来构造自己,而InputStreamReader类又需要一个InputStream对象来进行构造。得出结论是我们需要一个InputStream。回头审视DataInputStream,没错,它的父类就是InputStream,根据java父类引用可以指向子类对象的特点我们知道我们可以把一个InputStream的子类传递给需要InputStream的地方去。这样就获得了按行读取的能力,每次读取到一行就加入我们的标记,第i个文件表示成i,结果如下所示。

input1.txt
aaaa 0
bbbb 0
cc 0
11 0

input2.txt
aaa 1
bbbb 1
ccc 1
22 1
然后放到hadoop上按key进行处理,如果同一个key的val同时出现0和1,保留到输出文件中即可。


二,实现

public class GetUnite extends Configured implements Tool {
    public static class MapperC extends MapReduceBase
            implements Mapper {
            public void map(LongWritable offset, Text val, OutputCollector out,
                    Reporter rep) throws IOException {
                String[] res = val.toString().split(" ");
                out.collect(new Text(res[0]), new Text(res[1]));
            }
    }

    public static class ReducerC extends MapReduceBase
            implements Reducer {
            public void reduce(Text key, Iterator itr, OutputCollector out,
                                  Reporter rep) throws IOException {
                int flag = -1;
                while(itr.hasNext()) {
                    String val = itr.next().toString();
                    if(0 == val.compareTo("1") && 0 == flag) {
                        out.collect(key, NullWritable.get());
                        break;
                    } else if(0 == val.compareTo("0") && 1 == flag) {
                        out.collect(key, NullWritable.get());
                        break;
                    } else if (-1 == flag) {
                        flag = val.compareTo("0");
                    }
                }
            }
    }
    
    public void PrintUsage() {
        System.out.println("Usage  ");
        ToolRunner.printGenericCommandUsage(System.out);
    }

    public void GenerateMergeFile(Path inDir, Path outDir) throws Exception {
        Configuration conf = new Configuration();
        FileSystem hdfs = FileSystem.get(conf);
        FileSystem local = FileSystem.getLocal(conf);
        try {
            FileStatus[] inputFiles = local.listStatus(inDir);
            FSDataOutputStream out = hdfs.create(outDir);
            BufferedReader br = null;            
            for (int i = 0; i < inputFiles.length; i ++) {
                System.out.println("merging file:" + inputFiles[i].getPath().getName() + "...");
                FSDataInputStream fis = local.open(inputFiles[i].getPath());
                InputStreamReader isr = new InputStreamReader(fis);
                br = new BufferedReader(isr);
                String res = null;
                while(null != (res = br.readLine())) {
                    res += " " + i + "\n";
                    byte[] buffer = res.getBytes();
                    out.write(buffer, 0, buffer.length);
                }
            }
            if (br != null ) {
                br.close();
            }
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }
    }
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            PrintUsage();
            return -1;
        }
        Path tmpDir = new Path("/home/arvinpeng/" + getClass().getName() + 
                Integer.toString(new Random().nextInt()));
        Path inDir = new Path(args[0]);
        Path outDir = new Path(args[1]);
        Path midDir = tmpDir.suffix("/arvin.peng");
        GenerateMergeFile(inDir, midDir);
        JobConf jobConf = new JobConf(getConf(), getClass());
        try {
            jobConf.setJobName("GetUnite-Job");
            jobConf.setMapperClass(MapperC.class);
            jobConf.setReducerClass(ReducerC.class);

            FileInputFormat.setInputPaths(jobConf, tmpDir);
            FileOutputFormat.setOutputPath(jobConf, outDir);

            jobConf.setMapOutputKeyClass(Text.class);
            jobConf.setMapOutputValueClass(Text.class);
            jobConf.setOutputKeyClass(Text.class);
            jobConf.setOutputValueClass(NullWritable.class);

            jobConf.setNumReduceTasks(1);

            JobClient.runJob(jobConf);
        } finally {
            FileSystem.get(jobConf).delete(tmpDir);
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new Configuration(), new GetUnite(), args);
        System.exit(ret);
    }
}

三,扩展

   程序支持2个文件求交集,如果要求N个文件交集呢,当然可以只修改reduce部分,同时判断出现了N个文件中的结果才收集,但是编码量会剧增;也可以进行递归的归并,首先两两分成N/2堆,分别跑一遍hadoop,生成的结果文件再次进行两两归并,按照如上递归处理,直到最后只生成1个文件,这样需要调整一下代码逻辑,带来的好处就是通用性大大增强,另外如果觉得文件太多可以调整2为更大的数,比如N/3, N/4等等,取一个均衡的值即可。别的方法,看到这的读者可发散思维,在评论里PK。

没有评论:

发表评论