快好知 kuaihz

大数据技术,mapreduce版的wordcount,对文...

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{

    /*

     * 对文本文件进行Wordcount, 文本文件的输入类型是 TextInputFormat ,它实现了createRecordReader,

     * 返回创建的LineRecordReader 实现类,这个类里就有对应的key和value的类型

     * 

     * 文本文件

     * KEYIN:行字节偏移量

     * VALUEIN:一行数据

     * 

     * mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量

     * 

     * key:单词,String 类型的封装类 Text

     * value:数值,Long类型的封装类LongWritable

     * 

     */

    public static class WordCountMapper extends Mapper{

      ZZ

        // map(), 一行调用一次

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String line = value.toString();

            System.out.println("map(): keyIn:" + key.get() + "; valueIn:" + value.toString());

            String[] splits = line.split(" ");

            for(String word : splits){

                keyOut.set(word);

                //map()输出数据,用 context.write()

                context.write(keyOut, valueOut);

                System.out.println("map(): keyOut:" + keyOut.toString() + "; valueOut:" + valueOut.get());

            }

        }

    }

    /*

     * KEYIN, VALUEIN: 根据map输出的类型来确定

     * KEYOUT, VALUEOUT:根据业务需求确定

     * KEYOUT 是单词,String 类型的封装类 Text

     * VALUEOUT 数值,Long类型的封装类LongWritable

     * 

     */

    public static class WordCountReducer extends Reducer{

        LongWritable valueOut = new LongWritable();

        //一个key调用一次

        @Override

        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            StringBuilder sb = new StringBuilder();

            sb.append("reduce(): keyIn:" + key.toString() + "; vlaueIn:[");

            long sum = 0;

            for(LongWritable w : values){

                //通过get(),获取LongWritable 对象的实际值

                long num = w.get();

                sum += num;

                sb.append(num).append(",");

            }

            sb.deleteCharAt(sb.length() - 1);

            sb.append("]");

            System.out.println(sb.toString());

            valueOut.set(sum);

            context.write(key, valueOut);

        }

    }

    @Override

    public int run(String[] args) throws Exception {

//        job 创建及配置, 提交任务

        Configuration conf = getConf();

//        创建job对象

        Job job = Job.getInstance(conf, "wordcount");

        //job 任务运行类

        job.setJarByClass(WordCount.class);

        //job 任务 map运行类

        job.setMapperClass(WordCountMapper.class);

        //job 任务reduce 运行类

        job.setReducerClass(WordCountReducer.class);

        //job 任务map阶段输出的key的类型

        job.setMapOutputKeyClass(Text.class);

        //job 任务map阶段输出的value类型

        job.setMapOutputValueClass(LongWritable.class);

        //job 任务reduce阶段(最后阶段)输出的key的类型

        job.setOutputKeyClass(Text.class);

        //job 任务reduce阶段(最后阶段)输出的value的类型

        job.setOutputValueClass(LongWritable.class);

        //设置reduce个数

        job.setNumReduceTasks(2);

        //job 任务的输入目录

        FileInputFormat.addInputPath(job, new Path(args[0]));

        Path outputPath = new Path(args[1]);

        //job 任务的输出目录

        FileOutputFormat.setOutputPath(job, outputPath);

        //解决自动删除输出目录

        FileSystem fs = FileSystem.get(conf);

        //判断文件系统下存不存在该目录,如果存在删除

        if(fs.exists(outputPath)){

            //递归删除

            fs.delete(outputPath, true);

            System.out.println("output dir: " + outputPath.toString() + " deleted SUCCESS!");

        }

        //提交任务

        //waitForCompletion(false); false:代表不输出counter

        boolean status = job.waitForCompletion(false);

        return status ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        //运行时 将输入输出目录放到执行参数里,用main()的args 接收到

        //  /tmp/input /tmp/output

        System.exit(ToolRunner.run(new WordCount(), args));

    }

}

本站资源来自互联网,仅供学习,如有侵权,请通知删除,敬请谅解!
搜索建议:mapreduce  mapreduce词条  wordcount  wordcount词条  数据  数据词条  技术  技术词条  
综合数码问答

 如何创建YouTube帐户

YouTube网站允许用户搜索和浏览数百万的视频频道、视频社区团体和评论,也允许用户对自己最爱的视频进行评分点赞或作出其它回应。同时,你也可以在YouTube网...(展开)

综合

 电脑如何设置双屏或者多屏

电脑怎么设置双屏或多屏显示呢?电脑多屏显示,可以实现一个屏幕玩游戏,另一个屏幕看股票,工作娱乐两不误,特别是剪辑师职业一般都配备多屏显示,那么如何给电脑设置双屏...(展开)