【hadoop代码笔记】hadoop作业提交之Child启动map任务

一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map或Reduce任务。在本篇和下篇博文中我们会关注启动的那个入口是org.apache.hadoop.mapred.Child的这个Java进程是如何执行用户定义的map或Reduce任务的。

上篇文章,TaskRunner线程执行中,会构造一个_java –D** Child address port tasked_这 样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从 TaskTracker获得需要执行的Task,并调用Task的run方法来执行,而Task的run方法会通过java反射机制构造 Mapper,InputFormat,mapperContext,然后调用构造的mapper的run方法执行mapper操作。

二、 流程描述

  1. Child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,在Child的main函数中调用执行。
  2. 在Chilld中,执行Task的run方法。Task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。
  3. 在MapTask的run中执行runMapper方法来调用mapper定义的方法。 在runNewMapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。
  4. mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据
  5. mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。

Child启动map任务

三、代码详细

  1. Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。观察下载在这个方法中做了哪些事情?

1)从传入的参数中获得tasktracker的地址、从传入的参数中获得tasktracker的地址

  1. 根据获取的taskTracker的地址和端口通过RPC方式和tasktracker通信,umbilical是作为tasktracker的代理来执行操作。

  2. 根据JvmId从taskTracker查询获取到JvmTask

  3. 执行任务

 1public static void main(String[] args) throws Throwable {
 2    LOG.debug("Child starting");
 3JobConf defaultConf = new JobConf();
 4
 5//从传入的参数中获得taskTracker的地址
 6String host = args[0];
 7//从传入的参数中获得taskTracker的响应请求的端口。
 8    int port = Integer.parseInt(args[1]);
 9    InetSocketAddress address = new InetSocketAddress(host, port);
10    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
11    final int SLEEP_LONGER_COUNT = 5;
12    int jvmIdInt = Integer.parseInt(args[3]);
13    JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
  1//通过RPC方式和tasktracker通信,umbilical是作为tasktracker的代理来执行操作。
  2TaskUmbilicalProtocol umbilical =
  3  (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
  4      TaskUmbilicalProtocol.versionID,
  5      address,
  6      defaultConf);
  7int numTasksToExecute = -1; //-1 signifies "no limit"
  8int numTasksExecuted = 0;
  9//for the memory management, a PID file is written and the PID file
 10//is written once per JVM. We simply symlink the file on a per task
 11//basis later (see below). Long term, we should change the Memory
 12//manager to use JVMId instead of TaskAttemptId
 13Path srcPidPath = null;
 14Path dstPidPath = null;
 15int idleLoopCount = 0;
 16Task task = null;
 17try {
 18  while (true) {
 19    taskid = null;
 20    //根据JvmId从taskTracker查询获取到JvmTask
 21    JvmTask myTask = umbilical.getTask(jvmId);
 22    if (myTask.shouldDie()) {
 23      break;
 24    } else {
 25      if (myTask.getTask() == null) {
 26        taskid = null;
 27        if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
 28          //we sleep for a bigger interval when we don't receive
 29          //tasks for a while
 30          Thread.sleep(1500);
 31        } else {
 32          Thread.sleep(500);
 33        }
 34        continue;
 35      }
 36    }
 37    idleLoopCount = 0;
 38    task = myTask.getTask();
 39    taskid = task.getTaskID();
 40    isCleanup = task.isTaskCleanupTask();
 41    // reset the statistics for the task
 42    FileSystem.clearStatistics();       
 43    TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
 44    JobConf job = new JobConf(task.getJobFile());
 45    if (job.getBoolean("task.memory.mgmt.enabled", false)) {
 46      if (srcPidPath == null) {
 47        srcPidPath = new Path(task.getPidFile());
 48      }
 49      //since the JVM is running multiple tasks potentially, we need
 50      //to do symlink stuff only for the subsequent tasks
 51      if (!taskid.equals(firstTaskid)) {
 52        dstPidPath = new Path(task.getPidFile());
 53        FileUtil.symLink(srcPidPath.toUri().getPath(), 
 54            dstPidPath.toUri().getPath());
 55      }
 56    }
 57    //setupWorkDir actually sets up the symlinks for the distributed
 58    //cache. After a task exits we wipe the workdir clean, and hence
 59    //the symlinks have to be rebuilt.
 60    TaskRunner.setupWorkDir(job);
 61
 62    numTasksToExecute = job.getNumTasksToExecutePerJvm();
 63    assert(numTasksToExecute != 0);
 64    TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
 65    task.setConf(job);
 66    defaultConf.addResource(new Path(task.getJobFile()));      
 67    // use job-specified working directory
 68    FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
 69    try {
 70      //执行任务
 71      task.run(job, umbilical);             // run the task
 72    } finally {
 73      TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
 74      if (!taskid.equals(firstTaskid) && 
 75          job.getBoolean("task.memory.mgmt.enabled", false)) {
 76        // delete the pid-file's symlink
 77        new File(dstPidPath.toUri().getPath()).delete();
 78      }
 79    }
 80    if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
 81      break;
 82    }
 83  }
 84} catch (FSError e) {
 85  LOG.fatal("FSError from child", e);
 86  umbilical.fsError(taskid, e.getMessage());
 87} catch (Throwable throwable) {
 88  LOG.warn("Error running child", throwable);
 89  try {
 90    if (task != null) {
 91      // do cleanup for the task
 92      task.taskCleanup(umbilical);
 93    }
 94  } catch (Throwable th) {
 95    LOG.info("Error cleaning up" + th);
 96  }
 97  // Report back any failures, for diagnostic purposes
 98  ByteArrayOutputStream baos = new ByteArrayOutputStream();
 99  throwable.printStackTrace(new PrintStream(baos));
100  if (taskid != null) {
101    umbilical.reportDiagnosticInfo(taskid, baos.toString());
102  }
103} finally {
104  RPC.stopProxy(umbilical);
105
106}
107 }
  1. TaskTracker 的getTask方法。TaskTracker实现了TaskUmbilicalProtocol接扣。getTask是该接口定义的一个方法。是子进程Child调用的根据jvmId获取task。
 1public synchronized JvmTask getTask(JVMId jvmId) 
 2  throws IOException {
 3  TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
 4    if (tip == null) {
 5      return new JvmTask(null, false);
 6    }
 7    if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
 8      LOG.info("JVM with ID: " + jvmId + " given task: " + 
 9          tip.getTask().getTaskID());
10      return new JvmTask(tip.getTask(), false);
11    } else {
12      LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " + 
13          tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
14      return new JvmTask(null, true);
15}

3.Task 的run方法。因为map和reduce的执行逻辑大不相同,先看下MapTask中该方法的实现。是真正执行用户定义的map或者reduce任务的入 口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的 不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的JobTracker的 heartbeat方法处的详细解释。

 1public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
 2    throws IOException, ClassNotFoundException, InterruptedException {
 3
 4    // 开启线程向TaskTracker上报进度
 5    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
 6    reporter.startCommunicationThread();
 7    boolean useNewApi = job.getUseNewMapper();
 8    initialize(job, getJobID(), reporter, useNewApi);
 9    
10    // 根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup
11    if (jobCleanup) {
12      runJobCleanupTask(umbilical, reporter);
13      return;
14    }
15    if (jobSetup) {
16      runJobSetupTask(umbilical, reporter);
17      return;
18    }
19    if (taskCleanup) {
20      runTaskCleanupTask(umbilical, reporter);
21      return;
22    }
23    
24    if (useNewApi) {
25      runNewMapper(job, split, umbilical, reporter);
26    } else {
27      runOldMapper(job, split, umbilical, reporter);
28    }
29    done(umbilical, reporter);
30  }
  1. TaskReporter的run方法。定时向父进程TaskTracker上报状态和进度。
 1public void run() {
 2      final int MAX_RETRIES = 3;
 3      int remainingRetries = MAX_RETRIES;
 4      // get current flag value and reset it as well
 5      boolean sendProgress = resetProgressFlag();
 6      while (!taskDone.get()) {
 7        try {
 8          boolean taskFound = true; // whether TT knows about this task
 9          // sleep for a bit
10          try {
11            Thread.sleep(PROGRESS_INTERVAL);
12          } 
13                    break;
14          }
15
16          if (sendProgress) {
17            // we need to send progress update
18            updateCounters();
19            taskStatus.statusUpdate(taskProgress.get(),
20                                    taskProgress.toString(), 
21                                    counters);
22            taskFound = umbilical.statusUpdate(taskId, taskStatus);
23            taskStatus.clearStatus();
24          }
25          else {
26            // send ping 
27            taskFound = umbilical.ping(taskId);
28          }
29    
30          // if Task Tracker is not aware of our task ID (probably because it died and 
31          // came back up), kill ourselves
32          if (!taskFound) {
33            LOG.warn("Parent died.  Exiting "+taskId);
34            System.exit(66);
35          }
36    
37          sendProgress = resetProgressFlag(); 
38          remainingRetries = MAX_RETRIES;
39        } 
40        catch (Throwable t) {
41                 }
42        }
43      }
44    }
  1. Task 的Initialize方法初始化后续要执行的几个重要变量。包括JobContext OutputFormat OutputCommitter等,这些都是后续执行中要用到的属性实例。
 1public void initialize(JobConf job, JobID id, 
 2                         Reporter reporter,
 3                         boolean useNewApi) throws IOException, 
 4                                                   ClassNotFoundException,
 5                                                   InterruptedException {
 6    jobContext = new JobContext(job, id, reporter);
 7    taskContext = new TaskAttemptContext(job, taskId, reporter);
 8    if (getState() == TaskStatus.State.UNASSIGNED) {
 9      setState(TaskStatus.State.RUNNING);
10    }
11    if (useNewApi) {
12      LOG.debug("using new api for output committer");
13      outputFormat =
14        ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
15      committer = outputFormat.getOutputCommitter(taskContext);
16    } else {
17      committer = conf.getOutputCommitter();
18    }
19    Path outputPath = FileOutputFormat.getOutputPath(conf);
20    if (outputPath != null) {
21      if ((committer instanceof FileOutputCommitter)) {
22        FileOutputFormat.setWorkOutputPath(conf, 
23          ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
24      } else {
25        FileOutputFormat.setWorkOutputPath(conf, outputPath);
26      }
27    }
28    committer.setupTask(taskContext);
29  }
  1. Task的 runJobCleanupTask方法。即如果在Task是jobCleanup,则调用OutputCommitter删除输出文件
 1protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
 2                               TaskReporter reporter
 3                              ) throws IOException, InterruptedException {
 4    // set phase for this task
 5    setPhase(TaskStatus.Phase.CLEANUP);
 6    getProgress().setStatus("cleanup");
 7    statusUpdate(umbilical);
 8    // do the cleanup
 9    committer.cleanupJob(jobContext);
10    done(umbilical, reporter);
11  }

7.Task的runJobSetupTask。如果Task是setupTask,则调用OutputCommitter,如创建Task要执行的根目录。

1protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
2                             TaskReporter reporter
3                             ) throws IOException, InterruptedException {
4    // do the setup
5    getProgress().setStatus("setup");
6    committer.setupJob(jobContext);
7    done(umbilical, reporter);
8  }
  1. Task的runTaskCleanupTask。如果Task是taskCleanup,则调用taskCleanup 方法。最终OutputCommitter方法删除task的工作目录。
1protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
2                                TaskReporter reporter) 
3  throws IOException, InterruptedException {
4    taskCleanup(umbilical);
5    done(umbilical, reporter);
6  }

9.MapTask的runNewMapper方法是我们要重点关注的方法,是真正执行用户定义的map的方法。

1)根据传入的jobconf构造一个context,包含了job相关的所有配置信息,如后面用到的mapper、inputformat等。

2)根据配置的mapper类创建一个Mapper实例

3)根据配置的inputformat创建一个InputFormat实例。

4)重新够构建InputSplit

5)创建RecordReader,其实使用的是适配器模式适配了inputFormat的Reader。

6)构造输出RecordWriter。当没有Reducer时,output是配置的outputFormat的RecordWriter,即直接写输出。如果ruducer数量不为0,则构造一个NewOutputCollector

7)构造Mapper.Context,封装了刚才配置的所有信息,在map执行时候时候使用。

8)调用mapper的run方法来执行map动作。

 1@SuppressWarnings("unchecked")
 2  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
 3  void runNewMapper(final JobConf job,
 4                    final BytesWritable rawSplit,
 5                    final TaskUmbilicalProtocol umbilical,
 6                    TaskReporter reporter
 7                    ) throws IOException, ClassNotFoundException,
 8                             InterruptedException {
 9    // 1. 根据传入的jobconf构造一个context,包含了job相关的所有配置信息,如后面用到的mapper、inputformat等。
10    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
11      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
12    // 2. 根据配置的mapper类创建一个Mapper实例
13    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
14      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
15        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
16    // 根据配置的input format创建一个InputFormat实例。
17    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
18      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
19        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
20    // 4.重新够构建InputSplit
21    org.apache.hadoop.mapreduce.InputSplit split = null;
22    DataInputBuffer splitBuffer = new DataInputBuffer();
23    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
24    SerializationFactory factory = new SerializationFactory(job);
25    Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>
26      deserializer = 
27        (Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>) 
28        factory.getDeserializer(job.getClassByName(splitClass));
29    deserializer.open(splitBuffer);
30    split = deserializer.deserialize(null);
31
32    //5. 创建RecordReader,其实使用的是适配器模式适配了inputFormat的Reader。
33    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
34      new NewTrackingRecordReader<INKEY,INVALUE>
35          (inputFormat.createRecordReader(split, taskContext), reporter);
36    
37    job.setBoolean("mapred.skip.on", isSkipping());
38    org.apache.hadoop.mapreduce.RecordWriter output = null;
39    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
40         mapperContext = null;
41    try {
42      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
43        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
44        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
45                     Configuration.class,
46                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
47                     org.apache.hadoop.mapreduce.RecordReader.class,
48                     org.apache.hadoop.mapreduce.RecordWriter.class,
49                     org.apache.hadoop.mapreduce.OutputCommitter.class,
50                     org.apache.hadoop.mapreduce.StatusReporter.class,
51                     org.apache.hadoop.mapreduce.InputSplit.class});
52    
53      //6. 构造输出RecordWriter。当没有Reducer时,output是配置的outputFormat的RecordWriter,即直接写输出。如果ruducer数量不为0,则构造一个NewOutputCollector
54      if (job.getNumReduceTasks() == 0) {
55        output = outputFormat.getRecordWriter(taskContext);
56      } else {
57        output = new NewOutputCollector(job, umbilical, reporter);
58      }
59    
60      //7.构造Mapper.Context,封装了刚才配置的所有信息,在map执行时候时候使用。
61      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
62                                                     input, output, committer,
63                                                     reporter, split);
64    
65      input.initialize(split, mapperContext);
66      //8. 调用mapper的run方法来执行map动作。
67      mapper.run(mapperContext);
68      input.close();
69      output.close(mapperContext);
70    } catch (NoSuchMethodException e) {
71      throw new IOException("Can't find Context constructor", e);
72    } catch (InstantiationException e) {
73      throw new IOException("Can't create Context", e);
74    } catch (InvocationTargetException e) {
75      throw new IOException("Can't invoke Context constructor", e);
76    } catch (IllegalAccessException e) {
77      throw new IOException("Can't invoke Context constructor", e);
78    }
79  }

10.Mapper的run方法。即对每一个输入的记录执行map方法。一般不会改变,就是拿出输入记录逐条执行map方法。除非要改变记录的执行方式,(如MultithreadedMapper需要多线程来执行),一般该方法不用override。

1public void run(Context context) throws IOException, InterruptedException {
2    setup(context);
3    while (context.nextKeyValue()) {
4      map(context.getCurrentKey(), context.getCurrentValue(), context);
5    }
6    cleanup(context);
7  }
8}

11.Mapper的map方法。即对每一个输入的记录执行map方法。这个只是默然的map执行方法,把输入不变的输出即可。用户定义的mapper就是override这个方法来按照自己定义的逻辑来处理数据。

1protected void map(KEYIN key, VALUEIN value, 
2                     Context context) throws IOException, InterruptedException {
3    context.write((KEYOUT) key, (VALUEOUT) value);
4  }

完。