【hadoop代码笔记】hadoop作业提交之Job初始化
一、概要描述
在上一篇博文中主要描述了JobTracker和其几个服务(或功能)模块的接收到提交的job后的一些处理。其中很重要的一部分就作业的初始化。因为代码片段图的表达问题,本应该在上篇描述的内容,分开在本篇描述。
二、 流程描述
-
代码也接上文的最后一个方法EagerTaskInitializationListener的 jobAdded方法把JobInProgress类型的job放到List
类型的 jobInitQueue中,有个单独的线程会对新加入的每个job进行初始化,其初始化调用的方法就是JobInProgress的方法 initTasks。 -
在JobInProgress的方法initTasks方法中,会根据传入的作业分片创建对应数量的TaskInProgress类型的maptask,同时会创建TaskInProgress类型的指定数量的reducetask。
-
TaskInProgress的初始化是由其构造函数和构造函数中调用的init方法完成的。
三、代码详细
1. EagerTaskInitializationListener的内部InitJob线程的run方法。调用JobInProgress的初始化方法。
1static class InitJob implements Runnable {
2 private JobInProgress job;
3 public InitJob(JobInProgress job) {
4 this.job = job;
5 }
6public void run()
7 {
8 job.initTasks();
9 }
10 }
2. JobInProgress 类的initTasks方法。
主要流程:
1)根据读入的split确定map的数量,每个split一个map
2)如果Task数大于该jobTracker支持的最大task数,则抛出异常。
3)根据split的数量初始化maps
4)如果没有split,表示job已经成功结束。
- 根据指定的reduce数量numReduceTasks创建reduce task
6)计算并且最少剩下多少map task ,才可以开始Reduce task。默认是总的map task的5%,即大部分Map task完成后,就可以开始reduce task了。
//1) 根据读入的split确定map的数量,每个split一个map
1String jobFile = profile.getJobFile();
2 Path sysDir = new Path(this.jobtracker.getSystemDir());
3 FileSystem fs = sysDir.getFileSystem(conf);
4 DataInputStream splitFile =
5 fs.open(new Path(conf.get("mapred.job.split.file")));
6 JobClient.RawSplit[] splits;
7 splits = JobClient.readSplitFile(splitFile);
8 numMapTasks = splits.length;
//2)如果Task数大于该jobTracker支持的最大task数,则抛出异常。
1int maxTasks = jobtracker.getMaxTasksPerJob();
2 if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
3 throw new IOException(
4 "The number of tasks for this job " +
5 (numMapTasks + numReduceTasks) +
6 " exceeds the configured limit " + maxTasks);
7 }
//3)根据split的数量初始化maps
1 maps = new TaskInProgress[numMapTasks];
2 for(int i=0; i < numMapTasks; ++i) {
3 inputLength += splits[i].getDataLength();
4 maps[i] = new TaskInProgress(jobId, jobFile,
5 splits[i],
6 jobtracker, conf, this, i);
7 }
8 LOG.info("Input size for job "+ jobId + " = " + inputLength);
9 if (numMapTasks > 0) {
10 LOG.info("Split info for job:" + jobId + " with " +
11 splits.length + " splits:");
12 nonRunningMapCache = createCache(splits, maxLevel);
13 }
14
15 this.launchTime = System.currentTimeMillis();
//4)如果没有split,表示job已经成功结束。
1 if (numMapTasks == 0) {
2 //设定作业的完成时间避免下次还会判断。
3 this.finishTime = this.launchTime;
4 status.setSetupProgress(1.0f);
5 status.setMapProgress(1.0f);
6 status.setReduceProgress(1.0f);
7 status.setCleanupProgress(1.0f);
8 status.setRunState(JobStatus.SUCCEEDED);
9 tasksInited.set(true);
10 JobHistory.JobInfo.logInited(profile.getJobID(),
11 this.launchTime, 0, 0);
12 JobHistory.JobInfo.logFinished(profile.getJobID(),
13 this.finishTime, 0, 0, 0, 0,
14 getCounters());
15 return;
16 }
//5) 根据指定的reduce数量numReduceTasks创建reduce task
1 this.reduces = new TaskInProgress[numReduceTasks];
2 for (int i = 0; i < numReduceTasks; i++) {
3 reduces[i] = new TaskInProgress(jobId, jobFile,
4 numMapTasks, i,
5 jobtracker, conf, this);
6 nonRunningReduces.add(reduces[i]);
7 }
// 6)计算最少剩下多少map task ,才可以开始Reduce task。默认是总的map task的5%,即大部分Map task完成后,就可以开始reduce task了。
1 completedMapsForReduceSlowstart =
2 (int)Math.ceil(
3 (conf.getFloat("mapred.reduce.slowstart.completed.maps",
4 DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
5 numMapTasks));
6
7 tasksInited.set(true);
8 }
3. TaskInProgress的构造函数
有构造MapTask的构造函数和构造ReduceTask的构造函数。分别是如下。其主要区别在于构造mapTask是要传入输入分片信息的RawSplit,而Reduce Task则不需要。两个构造函数都要调用init方法,进行其他的初始化。
1public TaskInProgress(JobID jobid, String jobFile,
2 RawSplit rawSplit,
3 JobTracker jobtracker, JobConf conf,
4 JobInProgress job, int partition) {
5 this.jobFile = jobFile;
6 this.rawSplit = rawSplit;
7 this.jobtracker = jobtracker;
8 this.job = job;
9 this.conf = conf;
10 this.partition = partition;
11 this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
12 setMaxTaskAttempts();
13 init(jobid);
14 }
1public TaskInProgress(JobID jobid, String jobFile,
2 int numMaps,
3 int partition, JobTracker jobtracker, JobConf conf,
4 JobInProgress job) {
5 this.jobFile = jobFile;
6 this.numMaps = numMaps;
7 this.partition = partition;
8 this.jobtracker = jobtracker;
9 this.job = job;
10 this.conf = conf;
11 this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
12 setMaxTaskAttempts();
13 init(jobid);
14 }
- TaskInProgress的init方法。初始化写map和reduce类型task都需要的初始化信息。
1void init(JobID jobId) {
2 this.startTime = System.currentTimeMillis();
3 this.id = new TaskID(jobId, isMapTask(), partition);
4 this.skipping = startSkipping();
5 }
完。