【hadoop代码笔记】hadoop作业提交之TaskTracker 启动task

一、概要描述

上篇博文描 述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到 LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法 后,TaskTracker怎么来处理这些Task。

实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个 TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List tasksToLaunch。

TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动 Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程, 在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

二、 流程描述

  1. tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
  2. TaskLauncher内部维护了一个List tasksToLaunch,只是把task加入到该
  3. taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
  4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
  5. TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
  6. 在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
  7. JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用.
  8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用.
  9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。

TaskTracker启动task

三、代码详细

  1. TaskTracker的 addToTaskQueue方法。

接上文的最后一个方法的在heartbeat中把根据jobtracker的指令把需要launch的task调用addToTaskQueue方法加入task queue。

1//根据task的类型不同加入到不同的launcher中。
2private void addToTaskQueue(LaunchTaskAction action) {
3if (action.getTask().isMapTask()) {
4mapLauncher.addToTaskQueue(action);
5}else {
6reduceLauncher.addToTaskQueue(action);
7}
8}
  1. TaskLauncher 的addToTaskQueue方法,即把要launch的task加入到TaskLauncher内维护的一个列表List tasksToLaunch;中。
1public void addToTaskQueue(LaunchTaskAction action) {
2      synchronized (tasksToLaunch) {
3        TaskInProgress tip = registerTask(action, this);
4        tasksToLaunch.add(tip);
5        tasksToLaunch.notifyAll();
6      }
7}
  1. TaskLauncher线程的run方法。TaskLauncher是一个线程。一直检查task列表中有数据,取出一个来执行。
 1public void run() {    
 2          TaskInProgress tip;
 3          synchronized (tasksToLaunch) {
 4            while (tasksToLaunch.isEmpty()) {
 5              tasksToLaunch.wait();
 6            }
 7            //get the TIP
 8            tip = tasksToLaunch.remove(0);
 9           //wait for a slot to run
10          synchronized (numFreeSlots) {
11            while (numFreeSlots.get() == 0) {
12              numFreeSlots.wait();
13            }
14            LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
15                " and trying to launch "+tip.getTask().getTaskID());
16            numFreeSlots.set(numFreeSlots.get() - 1);
17            assert (numFreeSlots.get() >= 0);
18          }
19
20          //got a free slot. launch the task
21          startNewTask(tip);
22              return; // ALL DONE
23             }
24    }
25  }
  1. TaskTracker的startNewTask 启动一个新task。该方法的主要代码就一句。
1localizeJob(tip);
  1. TaskTracker的localizeJob方法。 初始化job的目录
 1private void localizeJob(TaskInProgress tip) throws IOException {
 2        Path localJarFile = null;
 3        Task t = tip.getTask();
 4        JobID jobId = t.getJobID();
 5        Path jobFile = new Path(t.getJobFile());
 6        FileStatus status = null;
 7        long jobFileSize = -1;
 8        status = systemFS.getFileStatus(jobFile);
 9        jobFileSize = status.getLen();
10
11        Path localJobFile = lDirAlloc.getLocalPathForWrite(
12                getLocalJobDir(jobId.toString())
13                + Path.SEPARATOR + "job.xml",
14                jobFileSize, fConf);
15        RunningJob rjob = addTaskToJob(jobId, tip);
16        synchronized (rjob) {
17            if (!rjob.localized) {
18    
19                FileSystem localFs = FileSystem.getLocal(fConf);
20    
21                systemFS.copyToLocalFile(jobFile, localJobFile);
22                JobConf localJobConf = new JobConf(localJobFile);
23                Path workDir = lDirAlloc.getLocalPathForWrite(
24                        (getLocalJobDir(jobId.toString())
25                                + Path.SEPARATOR + "work"), fConf);
26    
27                System.setProperty("job.local.dir", workDir.toString());
28                localJobConf.set("job.local.dir", workDir.toString());
29    
30                //把job的jar文件拷贝到本地文件系统并且解压。
31                String jarFile = localJobConf.getJar();
32    
33                Path jarFilePath = new Path(jarFile);
34                status = systemFS.getFileStatus(jarFilePath);
35                jarFileSize = status.getLen();
36    
37                //保证释放的目录容量有5倍的jar文件大小
38                localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
39                        getLocalJobDir(jobId.toString())
40                        + Path.SEPARATOR + "jars",
41                        5 * jarFileSize, fConf), "job.jar");
42    
43                //把jar文件拷贝到本地
44                systemFS.copyToLocalFile(jarFilePath, localJarFile);
45                localJobConf.setJar(localJarFile.toString());
46                OutputStream out = localFs.create(localJobFile);
47                localJobConf.writeXml(out);
48    
49                // also unjar the job.jar files 
50                RunJar.unJar(new File(localJarFile.toString()),
51                        new File(localJarFile.getParent().toString()));
52            }
53            rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
54                    localJobConf.getKeepFailedTaskFiles());
55            rjob.localized = true;
56            rjob.jobConf = localJobConf;
57        }
58    }
59    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
60}
  1. TaskTracker的addTaskToJob方法。只是把job和task的关系加入到runningJobs中。
 1private RunningJob addTaskToJob(JobID jobId, 
 2                                  TaskInProgress tip) {
 3    synchronized (runningJobs) {
 4      RunningJob rJob = null;
 5      if (!runningJobs.containsKey(jobId)) {
 6        rJob = new RunningJob(jobId);
 7        rJob.localized = false;
 8        rJob.tasks = new HashSet<TaskInProgress>();
 9        runningJobs.put(jobId, rJob);
10      } else {
11        rJob = runningJobs.get(jobId);
12      }
13      synchronized (rJob) {
14        rJob.tasks.add(tip);
15      }
16      runningJobs.notify(); //notify the fetcher thread
17      return rJob;
18    }
19  }
  1. TaskTracker的launchTaskForJob方法。调用TaskInprogress的launchTask方法。
1private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) {
2    synchronized (tip) {
3      tip.setJobConf(jobConf);
4      tip.launchTask();
5    }
6  }
  1. TaskIProgress的 launchTask方法。
 1public synchronized void launchTask() throws IOException {
 2        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
 3                this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
 4                this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
 5            localizeTask(task);
 6            if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
 7                this.taskStatus.setRunState(TaskStatus.State.RUNNING);
 8            }
 9            //创建一个Runner来运行。
10            this.runner = task.createRunner(TaskTracker.this, this);
11            this.runner.start();
12            this.taskStatus.setStartTime(System.currentTimeMillis());
13        }

9.TaskinProgress的localizeTask方法。把Task相关的文件拷贝到本地。

 1private void localizeTask(Task task) throws IOException{
 2
 3      Path localTaskDir = 
 4        lDirAlloc.getLocalPathForWrite(
 5          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
 6            task.getTaskID().toString(), task.isTaskCleanupTask()), 
 7          defaultJobConf );
 8    
 9      FileSystem localFs = FileSystem.getLocal(fConf);
10      // create symlink for ../work if it already doesnt exist
11      String workDir = lDirAlloc.getLocalPathToRead(
12                         TaskTracker.getLocalJobDir(task.getJobID().toString())
13                         + Path.SEPARATOR  
14                         + "work", defaultJobConf).toString();
15      String link = localTaskDir.getParent().toString() 
16                      + Path.SEPARATOR + "work";
17      File flink = new File(link);
18      if (!flink.exists())
19        FileUtil.symLink(workDir, link);
20    
21      // 创建task的工作目录
22      Path cwd = lDirAlloc.getLocalPathForWrite(
23                   getLocalTaskDir(task.getJobID().toString(), 
24                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
25                   + Path.SEPARATOR + MRConstants.WORKDIR,
26                   defaultJobConf);
27    
28      Path localTaskFile = new Path(localTaskDir, "job.xml");
29      task.setJobFile(localTaskFile.toString());
30      localJobConf.set("mapred.local.dir",
31                       fConf.get("mapred.local.dir")); 
32    
33      localJobConf.set("mapred.task.id", task.getTaskID().toString());     
34      }
35            OutputStream out = localFs.create(localTaskFile);
36             localJobConf.writeXml(out);
37    
38      task.setConf(localJobConf);
39    }

10.Task是个抽象类,两个子类分别是MapTask和ReduceTask。先关注Map的TaskRunner。

1public TaskRunner createRunner(TaskTracker tracker, 
2      TaskTracker.TaskInProgress tip) {
3    return new MapTaskRunner(tip, tracker, this.conf);
4  }
  1. TaskRunner线程的Run方法,有420行代码!主要作用是根据配置信息,构造java命令,启动一个java进程。

拼接一个java指令,启动一个单独的java进程来执行每一个map或者reduce任务。这个java命令的class是Child。即这个java进程最终调用的是Child类的main函数。

12.JvmManager的 launchJvm方法。在TaskRunner的run方法,是构造一个java命令的参数,调用JvmManager的launchJvm方法执行。

1public void launchJvm(TaskRunner t, JvmEnv env) {
2    if (t.getTask().isMapTask()) {
3      mapJvmManager.reapJvm(t, env);
4    } else {
5      reduceJvmManager.reapJvm(t, env);
6    }
7  }
  1. JvmManagerForType的reapJvm方法
 1private synchronized void reapJvm( 
 2        TaskRunner t, JvmEnv env) {
 3      if (t.getTaskInProgress().wasKilled()) {
 4       //如果task被杀死则直接返回
 5        return;
 6      }
 7      boolean spawnNewJvm = false;
 8      JobID jobId = t.getTask().getJobID();
 9      //检查是否有空闲的槽,如果小于最大jvm数,则重新开启一个jvm,不让你从现有job的空闲jvm中选择一个,或者杀死另外job的空闲jvm
10      int numJvmsSpawned = jvmIdToRunner.size();
11      JvmRunner runnerToKill = null;
12      if (numJvmsSpawned >= maxJvms) {
13        //go through the list of JVMs for all jobs.
14        Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
15          jvmIdToRunner.entrySet().iterator();
16
17        while (jvmIter.hasNext()) {
18          JvmRunner jvmRunner = jvmIter.next().getValue();
19          JobID jId = jvmRunner.jvmId.getJobId();
20          //look for a free JVM for this job; if one exists then just break
21          if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
22            setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
23            LOG.info("No new JVM spawned for jobId/taskid: " + 
24                     jobId+"/"+t.getTask().getTaskID() +
25                     ". Attempting to reuse: " + jvmRunner.jvmId);
26            return;
27          }
28    
29          if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
30              (!jId.equals(jobId) && !jvmRunner.isBusy())) {
31            runnerToKill = jvmRunner;
32            spawnNewJvm = true;
33          }
34        }
35      } else {
36        spawnNewJvm = true;
37      }
38    
39      if (spawnNewJvm) {
40        if (runnerToKill != null) {
41          LOG.info("Killing JVM: " + runnerToKill.jvmId);
42          runnerToKill.kill();
43        }
44        spawnNewJvm(jobId, env, t);
45        return;
46      }
47}
  1. JvmManagerForType的spawnNewJvm方法。重新启动一个jvm。
 1private void spawnNewJvm(JobID jobId, JvmEnv env,  
 2        TaskRunner t) {
 3      JvmRunner jvmRunner = new JvmRunner(env,jobId);
 4      jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
 5      jvmRunner.setDaemon(true);
 6      jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
 7      setRunningTaskForJvm(jvmRunner.jvmId, t);
 8      LOG.info(jvmRunner.getName());
 9      jvmRunner.start();
10    }
  1. JvmRunner线程的run方法。
1public void run() {
2        runChild(env);
3      }
  1. JvmRunner线程的runChild方法。其中掉ShellCommandExecutor的execute方法。 ShellCommandExecutor封装了shell执行。即把前面步骤构造的JvmEnv类型的执行信息分装成一个字符串列表,使用该列表构造一 个ShellCommandExecutor来执行命令。
1public void runChild(JvmEnv env) {
2env.vargs.add(Integer.toString(jvmId.getId()));
3          List<String> wrappedCommand = 
4            TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
5                env.logSize, env.pidFile);
6          shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
7              env.workDir, env.env);
8          shexec.execute();
9}

完.