1publicsynchronizedvoidcollect(Kkey,Vvalue,intpartition 2)throwsIOException{ 3reporter.progress(); 4if(key.getClass()!=keyClass){ 5thrownewIOException("Type mismatch in key from map: expected " 6+keyClass.getName()+", recieved " 7+key.getClass().getName()); 8} 9if(value.getClass()!=valClass){10thrownewIOException("Type mismatch in value from map: expected "11+valClass.getName()+", recieved "12+value.getClass().getName());13}14//对kvoffsets的长度取模,暗示我们这是一个环形缓存。15finalintkvnext=(kvindex+1)%kvoffsets.length;16//进入临界区 17spillLock.lock();18try{19booleankvfull;20do{21if(sortSpillException!=null){22throw(IOException)newIOException("Spill failed"23).initCause(sortSpillException);24}25// sufficient acct space26kvfull=kvnext==kvstart;27finalbooleankvsoftlimit=((kvnext>kvend)28?kvnext-kvend>softRecordLimit29:kvend-kvnext<=kvoffsets.length-softRecordLimit);30if(kvstart==kvend&&kvsoftlimit){31LOG.info("Spilling map output: record full = "+kvsoftlimit);32//其实是设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束33startSpill();34}35if(kvfull){36try{37while(kvstart!=kvend){38//kvstart不等于kvend,表示系统正在spill,等待spillDone信号39reporter.progress();40spillDone.await();41}42}catch(InterruptedExceptione){43throw(IOException)newIOException(44"Collector interrupted while waiting for the writer"45).initCause(e);46}47}48}while(kvfull);49}finally{50spillLock.unlock();51}5253try{54//先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。串行化过程中,往缓冲区写是最终调用了Buffer.write方法55// serialize key bytes into buffer56intkeystart=bufindex;57keySerializer.serialize(key);58if(bufindex<keystart){59//如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法 会把bufvoid设置为bufmark,缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。 60bb.reset();61keystart=0;62}63// serialize value bytes into buffer64finalintvalstart=bufindex;65valSerializer.serialize(value);66intvalend=bb.markRecord();6768if(partition<0||partition>=partitions){69thrownewIOException("Illegal partition for "+key+" ("+70partition+")");71}7273mapOutputRecordCounter.increment(1);74mapOutputByteCounter.increment(valend>=keystart75?valend-keystart76:(bufvoid-keystart)+valend);7778// update accounting info79intind=kvindex*ACCTSIZE;80kvoffsets[kvindex]=ind;81kvindices[ind+PARTITION]=partition;82kvindices[ind+KEYSTART]=keystart;83kvindices[ind+VALSTART]=valstart;84kvindex=kvnext;85}catch(MapBufferTooSmallExceptione){86LOG.info("Record too large for in-memory buffer: "+e.getMessage());87//如果value的串行化结果太大,不能一次放入缓冲区88spillSingleRecord(key,value,partition);89mapOutputRecordCounter.increment(1);90return;91}92}
1privatevoidsortAndSpill()throwsIOException,ClassNotFoundException, 2InterruptedException{ 3//approximate the length of the output file to be the length of the 4//buffer + header lengths for the partitions 5longsize=(bufend>=bufstart 6?bufend-bufstart 7:(bufvoid-bufend)+bufstart)+ 8partitions*APPROX_HEADER_LENGTH; 9FSDataOutputStreamout=null;10try{11// 创建溢出文件12finalSpillRecordspillRec=newSpillRecord(partitions);13finalPathfilename=mapOutputFile.getSpillFileForWrite(getTaskID(),14numSpills,size);15out=rfs.create(filename);1617finalintendPosition=(kvend>kvstart)18?kvend19:kvoffsets.length+kvend;20//使用sorter进行排序, 在内存中进行,参照MapOutputBuffer的compare方法实现的这里的排序也是对序列化的字节做的排序。排序是在kvoffsets上面进行,参照MapOutputBuffer的swap方法实现。21sorter.sort(MapOutputBuffer.this,kvstart,endPosition,reporter);22intspindex=kvstart;23IndexRecordrec=newIndexRecord();24InMemValBytesvalue=newInMemValBytes();25for(inti=0;i<partitions;++i){26IFile.Writer<K,V>writer=null;27try{28longsegmentStart=out.getPos();29writer=newWriter<K,V>(job,out,keyClass,valClass,codec,30spilledRecordsCounter);31if(combinerRunner==null){32// 如果没有combinner则直接写键值33DataInputBufferkey=newDataInputBuffer();34while(spindex<endPosition&&35kvindices[kvoffsets[spindex%kvoffsets.length]36+PARTITION]==i){37finalintkvoff=kvoffsets[spindex%kvoffsets.length];38getVBytesForOffset(kvoff,value);39key.reset(kvbuffer,kvindices[kvoff+KEYSTART],40(kvindices[kvoff+VALSTART]-41kvindices[kvoff+KEYSTART]));42//键值写到溢出文件43writer.append(key,value);44++spindex;45}46}else{47intspstart=spindex;48while(spindex<endPosition&&49kvindices[kvoffsets[spindex%kvoffsets.length]50+PARTITION]==i){51++spindex;52}53//如果设置了combiner,则调用了combine方法后的结果写到IFile中,writer还是先前的writer。减少溢写到磁盘的数据量。54if(spstart!=spindex){55combineCollector.setWriter(writer);56RawKeyValueIteratorkvIter=57newMRResultIterator(spstart,spindex);58combinerRunner.combine(kvIter,combineCollector);59}60}6162// close the writer63writer.close();6465// record offsets66rec.startOffset=segmentStart;67rec.rawLength=writer.getRawLength();68rec.partLength=writer.getCompressedLength();69spillRec.putIndex(rec,i);7071writer=null;72}finally{73if(null!=writer)writer.close();74}75}7677if(totalIndexCacheMemory>=INDEX_CACHE_MEMORY_LIMIT){78// 写溢出索引文件,格式如+ "/spill" + spillNumber + ".out.index"79PathindexFilename=mapOutputFile.getSpillIndexFileForWrite(80getTaskID(),numSpills,81partitions*MAP_OUTPUT_INDEX_RECORD_LENGTH);82spillRec.writeToFile(indexFilename,job);83}else{84indexCacheList.add(spillRec);85totalIndexCacheMemory+=86spillRec.size()*MAP_OUTPUT_INDEX_RECORD_LENGTH;87}88LOG.info("Finished spill "+numSpills);89++numSpills;90}finally{91if(out!=null)out.close();92}93}
1publicsynchronizedvoidflush()throwsIOException,ClassNotFoundException, 2InterruptedException{ 3LOG.info("Starting flush of map output"); 4spillLock.lock(); 5try{ 6while(kvstart!=kvend){ 7reporter.progress(); 8spillDone.await(); 9}10if(sortSpillException!=null){11throw(IOException)newIOException("Spill failed"12).initCause(sortSpillException);13}14if(kvend!=kvindex){15kvend=kvindex;16bufend=bufmark;17sortAndSpill();18}19}catch(InterruptedExceptione){20throw(IOException)newIOException(21"Buffer interrupted while waiting for the writer"22).initCause(e);23}finally{24spillLock.unlock();25}26assert!spillLock.isHeldByCurrentThread();2728try{29spillThread.interrupt();30spillThread.join();31}catch(InterruptedExceptione){32throw(IOException)newIOException("Spill failed"33).initCause(e);34}35// release sort buffer before the merge36kvbuffer=null;37mergeParts();38}