2013年1月6日星期日

hadoop蒙特卡洛算法续集

这个版本是参考自带samples实现。相比上个野生版本,增加修改如下特性:
old/new
1:生成点数由输入文本文件决定/生成点数可由命令行参数指定,据此生成相应二进制文件

2:文件中存在冗余列/二进制输入输出文件中不再有冗余列,程序中通过NullWritable来对原本冗余列的位置占位



砍掉一堆import后的代码如下:
public class MonteCarloArg extends Configured implements Tool {

    static private final Path TMP_DIR = new Path(
            "/home/arvinpeng/" + MonteCarloArg.class.getSimpleName() + "_TMP_3_141592654");
    static private final Long RAND_MAX = (1L << 63) - 1;

    public static boolean inCircle(double x, double y) {
        return x * x + y * y <= 1.0;
    }

    public static class MapC extends MapReduceBase 
            implements Mapper {

            public void map(LongWritable offset, LongWritable size, OutputCollectorout,
                    Reporter rep) throws IOException {
                double x = 0.0;
                double y = 0.0;
                long num_in = 0;
                long num = size.get();
                Random rand = new Random();
                for (long i = 0; i < num; ++ i) {
                    x = Math.abs((double)(rand.nextLong() * 1.0 / RAND_MAX));
                    y = Math.abs((double)(rand.nextLong() * 1.0 / RAND_MAX));
                    if (inCircle(x, y)) {
                        ++ num_in;
                    }
                    if (i % 1000 == 0) {
                        rep.setStatus("Generated " + i + " samples");
                    }
                }
                out.collect(new BooleanWritable(true), new LongWritable(num_in));
            }
    }

    public static class ReduceC extends MapReduceBase
            implements Reducer, NullWritable> {

        private long in_sum = 0;
        private JobConf conf; // configuration for accessing the file system

        @Override
        public void configure(JobConf job_conf) {
            conf = job_conf;
        }

        public void reduce(BooleanWritable key, Iterator val, 
                OutputCollector, NullWritable>out, Reporter rep) throws IOException {
            while(val.hasNext()) {
                in_sum += val.next().get();
            }
        }

        @Override
        public void close() throws IOException{
            Path out_path = new Path(TMP_DIR, "outputPi");
            Path out_file = new Path(out_path, "outPi.dat");
            FileSystem file_sys = FileSystem.get(conf);
            SequenceFile.Writer writer = SequenceFile.createWriter(file_sys, conf, out_file, 
                    LongWritable.class, NullWritable.class, CompressionType.NONE);
            writer.append(new LongWritable(in_sum), NullWritable.get());
            writer.close();
        }
    }

    public static void generateInputFile(FileSystem fsys, JobConf conf, Path in_dir, int num_sample, 
            long num_point) throws IOException {
        if (fsys.exists(TMP_DIR)) {
            throw new IOException(fsys.makeQualified(TMP_DIR) + " already exits!");
        } else if (!fsys.mkdirs(in_dir)) {
            throw new IOException("Can't create input directory " + in_dir);
        }
        for(int i = 0; i < num_sample; ++ i) {
            final Path in = new Path(in_dir, "part" + i);
            final LongWritable offset = new LongWritable(i * num_point); 
            final LongWritable size = new LongWritable(num_point);
            final SequenceFile.Writer writer = SequenceFile.createWriter(fsys, conf, 
                in, LongWritable.class, LongWritable.class, CompressionType.NONE);
            try {
                writer.append(offset, size);
            } finally {
                writer.close();
            }
            System.out.println("Wrote input for map # " + i);
        }
    }
    
    public static BigDecimal estimate(int ms, int rs, long size, JobConf jobConf) throws IOException {
        // set 
        jobConf.setJobName(MonteCarloArg.class.getSimpleName());
        jobConf.setInputFormat(SequenceFileInputFormat.class);
        jobConf.setOutputFormat(SequenceFileOutputFormat.class);
        jobConf.setMapOutputKeyClass(BooleanWritable.class);
        jobConf.setMapOutputValueClass(LongWritable.class);
        jobConf.setMapperClass(MapC.class);
        jobConf.setReducerClass(ReduceC.class);
        jobConf.setNumMapTasks(ms);
        jobConf.setNumReduceTasks(rs);
        // turn off speculative exec, because dfs doesn't handle multiple 
        // writers to the same file
        jobConf.setSpeculativeExecution(false);

        final Path in = new Path(TMP_DIR, "inputPi");
        final Path out = new Path(TMP_DIR, "outputPi");
        FileInputFormat.setInputPaths(jobConf, in);
        FileOutputFormat.setOutputPath(jobConf, out);
        FileSystem fsys = FileSystem.get(jobConf);
        try {
            generateInputFile(fsys, jobConf, in, ms, size); 
            System.out.println("Starting Hadoop Job");
            final long start_time = System.currentTimeMillis();
            JobClient.runJob(jobConf);
            final double duration = (System.currentTimeMillis() - start_time) / 1000.0;
            System.out.println("Job finished in " + duration + " seconds");

            // read file
            Path resFile = new Path(out, "outPi.dat");
            LongWritable in_circle = new LongWritable();
            SequenceFile.Reader reader = new SequenceFile.Reader(fsys, resFile, jobConf);
            try {
                reader.next(in_circle);
            } finally {
                reader.close();
            }
            return BigDecimal.valueOf(4).setScale(20).multiply(BigDecimal.valueOf(in_circle.get())).divide(
                    BigDecimal.valueOf(ms)).divide(BigDecimal.valueOf(size));
        } finally {
            //fsys.delete(TMP_DIR, true);
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 3) {
           System.err.print("Usage " + getClass().getName() + "    "); 
           ToolRunner.printGenericCommandUsage(System.err);
           return -1;
        }
        final int MS = Integer.parseInt(args[0]);
        final int RS = Integer.parseInt(args[1]);
        final long NSAMPLES = Long.parseLong(args[2]); 
        System.out.println("number of maps " + MS);
        System.out.println("number of reduces " + RS);
        System.out.println("samples per map " + NSAMPLES);
        final JobConf JOBCONF = new JobConf(getConf(), getClass());
        final BigDecimal PI = estimate(MS, RS, NSAMPLES, JOBCONF); 
        System.out.println("Pi is = " + PI);
        return 0;
    }

    public static void main(String[] argv) throws Exception {
        int ret = ToolRunner.run(null, new MonteCarloArg(), argv);
        System.exit(ret);
    }
}

另外关于蒙特卡洛算法,其实决定最终模拟结果精确性的还有一个重要因素,就是点的均匀程度(不是随机程度),原始的rand函数模拟的点存在重叠不够均匀的缺点,导致结果不够理想。samples中点的生成采用了Halton Sequence,保证点尽量均匀。但由于重点在分布式,所以忽略点生成的算法,这个版本依然采用野生随机生成法(samples中点生成算法代码太多实在懒得看我会乱说)。

没有评论:

发表评论