【hadoop代码笔记】hadoop作业提交之Job初始化

一、概要描述

上一篇博文中主要描述了JobTracker和其几个服务(或功能)模块的接收到提交的job后的一些处理。其中很重要的一部分就作业的初始化。因为代码片段图的表达问题,本应该在上篇描述的内容,分开在本篇描述。

二、 流程描述

  1. 代码也接上文的最后一个方法EagerTaskInitializationListener的 jobAdded方法把JobInProgress类型的job放到List类型的 jobInitQueue中,有个单独的线程会对新加入的每个job进行初始化,其初始化调用的方法就是JobInProgress的方法 initTasks。

  2. 在JobInProgress的方法initTasks方法中,会根据传入的作业分片创建对应数量的TaskInProgress类型的maptask,同时会创建TaskInProgress类型的指定数量的reducetask。

  3. TaskInProgress的初始化是由其构造函数和构造函数中调用的init方法完成的。

Hadoop Job初始化

三、代码详细

1. EagerTaskInitializationListener的内部InitJob线程的run方法。调用JobInProgress的初始化方法。

 1static class InitJob implements Runnable {
 2    private JobInProgress job;
 3    public InitJob(JobInProgress job) {
 4      this.job = job;
 5    }
 6public void run() 
 7   {
 8      job.initTasks();            
 9    }
10  }

2. JobInProgress 类的initTasks方法。

主要流程:

1)根据读入的split确定map的数量,每个split一个map

2)如果Task数大于该jobTracker支持的最大task数,则抛出异常。

3)根据split的数量初始化maps

4)如果没有split,表示job已经成功结束。

  1. 根据指定的reduce数量numReduceTasks创建reduce task

6)计算并且最少剩下多少map task ,才可以开始Reduce task。默认是总的map task的5%,即大部分Map task完成后,就可以开始reduce task了。

//1) 根据读入的split确定map的数量,每个split一个map

1String jobFile = profile.getJobFile();
2            Path sysDir = new Path(this.jobtracker.getSystemDir());
3            FileSystem fs = sysDir.getFileSystem(conf);
4            DataInputStream splitFile =
5              fs.open(new Path(conf.get("mapred.job.split.file")));
6            JobClient.RawSplit[] splits;
7              splits = JobClient.readSplitFile(splitFile);         
8            numMapTasks = splits.length;

//2)如果Task数大于该jobTracker支持的最大task数,则抛出异常。

1int maxTasks = jobtracker.getMaxTasksPerJob();
2            if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
3              throw new IOException(
4                        "The number of tasks for this job " + 
5                        (numMapTasks + numReduceTasks) +
6                        " exceeds the configured limit " + maxTasks);
7            }           

//3)根据split的数量初始化maps

 1            maps = new TaskInProgress[numMapTasks];
 2            for(int i=0; i < numMapTasks; ++i) {
 3              inputLength += splits[i].getDataLength();
 4              maps[i] = new TaskInProgress(jobId, jobFile, 
 5                                           splits[i], 
 6                                           jobtracker, conf, this, i);
 7            }
 8            LOG.info("Input size for job "+ jobId + " = " + inputLength);
 9            if (numMapTasks > 0) { 
10              LOG.info("Split info for job:" + jobId + " with " + 
11                       splits.length + " splits:");
12              nonRunningMapCache = createCache(splits, maxLevel);
13            }
14    
15            this.launchTime = System.currentTimeMillis();

//4)如果没有split,表示job已经成功结束。

 1            if (numMapTasks == 0) {
 2              //设定作业的完成时间避免下次还会判断。
 3              this.finishTime = this.launchTime;
 4              status.setSetupProgress(1.0f);
 5              status.setMapProgress(1.0f);
 6              status.setReduceProgress(1.0f);
 7              status.setCleanupProgress(1.0f);
 8              status.setRunState(JobStatus.SUCCEEDED);
 9              tasksInited.set(true);
10              JobHistory.JobInfo.logInited(profile.getJobID(), 
11                                            this.launchTime, 0, 0);
12              JobHistory.JobInfo.logFinished(profile.getJobID(), 
13                                             this.finishTime, 0, 0, 0, 0,
14                                             getCounters());
15              return;
16            }

//5) 根据指定的reduce数量numReduceTasks创建reduce task

1            this.reduces = new TaskInProgress[numReduceTasks];
2            for (int i = 0; i < numReduceTasks; i++) {
3              reduces[i] = new TaskInProgress(jobId, jobFile, 
4                                              numMapTasks, i, 
5                                              jobtracker, conf, this);
6              nonRunningReduces.add(reduces[i]);
7            }
      // 6)计算最少剩下多少map task ,才可以开始Reduce task。默认是总的map task的5%,即大部分Map task完成后,就可以开始reduce task了。
1            completedMapsForReduceSlowstart = 
2              (int)Math.ceil(
3                  (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
4                                 DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
5                   numMapTasks));
6    
7            tasksInited.set(true);
8        }

3. TaskInProgress的构造函数

有构造MapTask的构造函数和构造ReduceTask的构造函数。分别是如下。其主要区别在于构造mapTask是要传入输入分片信息的RawSplit,而Reduce Task则不需要。两个构造函数都要调用init方法,进行其他的初始化。

 1public TaskInProgress(JobID jobid, String jobFile, 
 2                        RawSplit rawSplit, 
 3                        JobTracker jobtracker, JobConf conf, 
 4                        JobInProgress job, int partition) {
 5    this.jobFile = jobFile;
 6    this.rawSplit = rawSplit;
 7    this.jobtracker = jobtracker;
 8    this.job = job;
 9    this.conf = conf;
10    this.partition = partition;
11    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
12    setMaxTaskAttempts();
13    init(jobid);
14  }
 1public TaskInProgress(JobID jobid, String jobFile, 
 2                        int numMaps, 
 3                        int partition, JobTracker jobtracker, JobConf conf,
 4                        JobInProgress job) {
 5    this.jobFile = jobFile;
 6    this.numMaps = numMaps;
 7    this.partition = partition;
 8    this.jobtracker = jobtracker;
 9    this.job = job;
10    this.conf = conf;
11    this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
12    setMaxTaskAttempts();
13    init(jobid);
14  }
  1. TaskInProgress的init方法。初始化写map和reduce类型task都需要的初始化信息。
1void init(JobID jobId) {
2    this.startTime = System.currentTimeMillis();
3    this.id = new TaskID(jobId, isMapTask(), partition);
4    this.skipping = startSkipping();
5  }

完。