好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

MapTask阶段shuffle源码分析

1. 收集阶段

在 mapper 中,调用 context.write(key,value) 实际是调用代理 newoutputcollector 的 wirte 方法

?

1

2

3

4

public void write(keyout key, valueout value

           ) throws ioexception, interruptedexception {

   output.write(key, value);

  }

实际调用的是 mapoutputbuffer 的 collect() ,在进行收集前,调用partitioner来计算每个key-value的分区号

?

1

2

3

4

5

@override

   public void write(k key, v value) throws ioexception, interruptedexception {

    collector.collect(key, value,

             partitioner.getpartition(key, value, partitions));

   }

2. newoutputcollector对象的创建

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

@suppresswarnings ( "unchecked" )

   newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext,

             jobconf job,

             taskumbilicalprotocol umbilical,

             taskreporter reporter

             ) throws ioexception, classnotfoundexception {

   // 创建实际用来收集key-value的缓存区对象

    collector = createsortingcollector(job, reporter);

   // 获取总的分区个数

    partitions = jobcontext.getnumreducetasks();

    if (partitions > 1 ) {

     partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>)

      reflectionutils.newinstance(jobcontext.getpartitionerclass(), job);

    } else {

     // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区

     partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() {

      @override

      public int getpartition(k key, v value, int numpartitions) {

       return partitions - 1 ;

      }

     };

    }

   }

3. 创建环形缓冲区对象

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

@suppresswarnings ( "unchecked" )

  private <key, value> mapoutputcollector<key, value>

      createsortingcollector(jobconf job, taskreporter reporter)

   throws ioexception, classnotfoundexception {

   mapoutputcollector.context context =

    new mapoutputcollector.context( this , job, reporter);

   // 从当前job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用mapoutputbuffer.class

   class <?>[] collectorclasses = job.getclasses(

    jobcontext.map_output_collector_class_attr, mapoutputbuffer. class );

   int remainingcollectors = collectorclasses.length;

   exception lastexception = null ;

   for ( class clazz : collectorclasses) {

    try {

     if (!mapoutputcollector. class .isassignablefrom(clazz)) {

      throw new ioexception( "invalid output collector class: " + clazz.getname() +

       " (does not implement mapoutputcollector)" );

     }

     class <? extends mapoutputcollector> subclazz =

      clazz.assubclass(mapoutputcollector. class );

     log.debug( "trying map output collector class: " + subclazz.getname());

    // 创建缓冲区对象

     mapoutputcollector<key, value> collector =

      reflectionutils.newinstance(subclazz, job);

    // 创建完缓冲区对象后,执行初始化

     collector.init(context);

     log.info( "map output collector class = " + collector.getclass().getname());

     return collector;

    } catch (exception e) {

     string msg = "unable to initialize mapoutputcollector " + clazz.getname();

     if (--remainingcollectors > 0 ) {

      msg += " (" + remainingcollectors + " more collector(s) to try)" ;

     }

     lastexception = e;

     log.warn(msg, e);

    }

   }

   throw new ioexception( "initialization of all the collectors failed. " +

    "error in last collector was :" + lastexception.getmessage(), lastexception);

  }

3. mapoutputbuffer的初始化   环形缓冲区对象

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

@suppresswarnings ( "unchecked" )

   public void init(mapoutputcollector.context context

           ) throws ioexception, classnotfoundexception {

    job = context.getjobconf();

    reporter = context.getreporter();

    maptask = context.getmaptask();

    mapoutputfile = maptask.getmapoutputfile();

    sortphase = maptask.getsortphase();

    spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records);

    // 获取分区总个数,取决于reducetask的数量

    partitions = job.getnumreducetasks();

    rfs = ((localfilesystem)filesystem.getlocal(job)).getraw();

    //sanity checks

    // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8

    final float spillper =

     job.getfloat(jobcontext.map_sort_spill_percent, ( float ) 0.8 );

    // 获取mapreduce.task.io.sort.mb,如果没设置,就是100mb

    final int sortmb = job.getint(jobcontext.io_sort_mb, 100 );

    indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit,

                      index_cache_memory_limit_default);

    if (spillper > ( float ) 1.0 || spillper <= ( float ) 0.0 ) {

     throw new ioexception( "invalid \"" + jobcontext.map_sort_spill_percent +

       "\": " + spillper);

    }

    if ((sortmb & 0x7ff ) != sortmb) {

     throw new ioexception(

       "invalid \"" + jobcontext.io_sort_mb + "\": " + sortmb);

    }

// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引

    sorter = reflectionutils.newinstance(job.getclass( "map.sort.class" ,

       quicksort. class , indexedsorter. class ), job);

    // buffers and accounting

    int maxmemusage = sortmb << 20 ;

    maxmemusage -= maxmemusage % metasize;

    // 存放key-value

    kvbuffer = new byte [maxmemusage];

    bufvoid = kvbuffer.length;

   // 存储key-value的属性信息,分区号,索引等

    kvmeta = bytebuffer.wrap(kvbuffer)

      .order(byteorder.nativeorder())

      .asintbuffer();

    setequator( 0 );

    bufstart = bufend = bufindex = equator;

    kvstart = kvend = kvindex;

    maxrec = kvmeta.capacity() / nmeta;

    softlimit = ( int )(kvbuffer.length * spillper);

    bufferremaining = softlimit;

    log.info(jobcontext.io_sort_mb + ": " + sortmb);

    log.info( "soft limit at " + softlimit);

    log.info( "bufstart = " + bufstart + "; bufvoid = " + bufvoid);

    log.info( "kvstart = " + kvstart + "; length = " + maxrec);

    // k/v serialization

     // 获取快速排序的key的比较器,排序只按照key进行排序!

    comparator = job.getoutputkeycomparator();

   // 获取key-value的序列化器

    keyclass = ( class <k>)job.getmapoutputkeyclass();

    valclass = ( class <v>)job.getmapoutputvalueclass();

    serializationfactory = new serializationfactory(job);

    keyserializer = serializationfactory.getserializer(keyclass);

    keyserializer.open(bb);

    valserializer = serializationfactory.getserializer(valclass);

    valserializer.open(bb);

    // output counters

    mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes);

    mapoutputrecordcounter =

     reporter.getcounter(taskcounter.map_output_records);

    fileoutputbytecounter = reporter

      .getcounter(taskcounter.map_output_materialized_bytes);

    // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器

    // compression

    if (job.getcompressmapoutput()) {

     class <? extends compressioncodec> codecclass =

      job.getmapoutputcompressorclass(defaultcodec. class );

     codec = reflectionutils.newinstance(codecclass, job);

    } else {

     codec = null ;

    }

    // 获取combiner组件

    // combiner

    final counters.counter combineinputcounter =

     reporter.getcounter(taskcounter.combine_input_records);

    combinerrunner = combinerrunner.create(job, gettaskid(),

                        combineinputcounter,

                        reporter, null );

    if (combinerrunner != null ) {

     final counters.counter combineoutputcounter =

      reporter.getcounter(taskcounter.combine_output_records);

     combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job);

    } else {

     combinecollector = null ;

    }

    spillinprogress = false ;

    minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3 );

    // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!

    spillthread.setdaemon( true );

    spillthread.setname( "spillthread" );

    spilllock.lock();

    try {

    // 启动线程

     spillthread.start();

     while (!spillthreadrunning) {

      spilldone.await();

     }

    } catch (interruptedexception e) {

     throw new ioexception( "spill thread failed to initialize" , e);

    } finally {

     spilllock.unlock();

    }

    if (sortspillexception != null ) {

     throw new ioexception( "spill thread failed to initialize" ,

       sortspillexception);

    }

   }

4. paritionner的获取

从配置中读取 mapreduce.job.partitioner.class ,如果没有指定,采用 hashpartitioner.class

如果reducetask > 1, 还没有设置分区组件,使用 hashpartitioner

?

1

2

3

4

5

6

@suppresswarnings ( "unchecked" )

  public class <? extends partitioner<?,?>> getpartitionerclass()

    throws classnotfoundexception {

   return ( class <? extends partitioner<?,?>>)

    conf.getclass(partitioner_class_attr, hashpartitioner. class );

  }

?

1

2

3

4

5

6

7

public class hashpartitioner<k, v> extends partitioner<k, v> {

  /** use {@link object#hashcode()} to partition. **/

  public int getpartition(k key, v value,

              int numreducetasks) {

   return (key.hashcode() & integer.max_value) % numreducetasks;

  }

}

分区号的限制:0 <= 分区号 < 总的分区数(reducetask的个数)

?

1

2

3

4

if (partition < 0 || partition >= partitions) {

     throw new ioexception( "illegal partition for " + key + " (" +

       partition + ")" );

    }

5.maptask shuffle的流程

              ①在map()调用context.write()

              ②调用mapoutputbuffer的collect()

                            调用分区组件partitionner计算当前这组key-value的分区号

              ③将当前key-value收集到mapoutputbuffer中

                            如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!

              ④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!

                            排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!

              ⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中

                            如果没有定义combiner,直接溢写!                             如果定义了combiner,使用combinerrunner.conbine()对key-value处理后再次溢写!

              ⑥多次溢写后,每次溢写都会产生一个临时文件

              ⑦最后,执行一次flush(),将剩余的key-value进行溢写

              ⑧mergeparts: 将多次溢写的结果,保存为一个总的文件!

                     在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!                      如果定义了conbiner,conbiner会再次运行(前提是溢写的文件个数大于3)!                      否则,就直接溢写!

              ⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待reducetask来拷贝相应分区的数据

6. combiner

combiner其实就是reducer类型:

?

1

2

class <? extends reducer<k,v,k,v>> cls =

     ( class <? extends reducer<k,v,k,v>>) job.getcombinerclass();

combiner的运行时机:

maptask:

              ①每次溢写前,如果指定了combiner,会运行               ②将多个溢写片段,进行合并为一个最终的文件时,也会运行combiner,前提是片段数>=3

reducetask:

              ③reducetask在运行时,需要启动shuffle进程拷贝maptask产生的数据!

                     数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!                      数据过多,内部不够,将部分数据溢写在磁盘!                      如果有溢写的过程,那么combiner会再次运行!

①一定会运行,②,③需要条件!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接

原文链接:https://blog.csdn.net/qq_43193797/article/details/86097451

查看更多关于MapTask阶段shuffle源码分析的详细内容...

  阅读:9次