【hadoop代码笔记】hadoop作业提交之Child启动map任务
一、概要描述
在上篇博文描述了TaskTracker启动一个独立的java进程来执行Map或Reduce任务。在本篇和下篇博文中我们会关注启动的那个入口是org.apache.hadoop.mapred.Child的这个Java进程是如何执行用户定义的map或Reduce任务的。
接上篇文章,TaskRunner线程执行中,会构造一个_java –D** Child address port tasked_这 样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从 TaskTracker获得需要执行的Task,并调用Task的run方法来执行,而Task的run方法会通过java反射机制构造 Mapper,InputFormat,mapperContext,然后调用构造的mapper的run方法执行mapper操作。
二、 流程描述
- Child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,在Child的main函数中调用执行。
- 在Chilld中,执行Task的run方法。Task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。
- 在MapTask的run中执行runMapper方法来调用mapper定义的方法。 在runNewMapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。
- mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据
- mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。
三、代码详细
- Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。观察下载在这个方法中做了哪些事情?
1)从传入的参数中获得tasktracker的地址、从传入的参数中获得tasktracker的地址
-
根据获取的taskTracker的地址和端口通过RPC方式和tasktracker通信,umbilical是作为tasktracker的代理来执行操作。
-
根据JvmId从taskTracker查询获取到JvmTask
-
执行任务
1public static void main(String[] args) throws Throwable {
2 LOG.debug("Child starting");
3JobConf defaultConf = new JobConf();
4
5//从传入的参数中获得taskTracker的地址
6String host = args[0];
7//从传入的参数中获得taskTracker的响应请求的端口。
8 int port = Integer.parseInt(args[1]);
9 InetSocketAddress address = new InetSocketAddress(host, port);
10 final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
11 final int SLEEP_LONGER_COUNT = 5;
12 int jvmIdInt = Integer.parseInt(args[3]);
13 JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
1//通过RPC方式和tasktracker通信,umbilical是作为tasktracker的代理来执行操作。
2TaskUmbilicalProtocol umbilical =
3 (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
4 TaskUmbilicalProtocol.versionID,
5 address,
6 defaultConf);
7int numTasksToExecute = -1; //-1 signifies "no limit"
8int numTasksExecuted = 0;
9//for the memory management, a PID file is written and the PID file
10//is written once per JVM. We simply symlink the file on a per task
11//basis later (see below). Long term, we should change the Memory
12//manager to use JVMId instead of TaskAttemptId
13Path srcPidPath = null;
14Path dstPidPath = null;
15int idleLoopCount = 0;
16Task task = null;
17try {
18 while (true) {
19 taskid = null;
20 //根据JvmId从taskTracker查询获取到JvmTask
21 JvmTask myTask = umbilical.getTask(jvmId);
22 if (myTask.shouldDie()) {
23 break;
24 } else {
25 if (myTask.getTask() == null) {
26 taskid = null;
27 if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
28 //we sleep for a bigger interval when we don't receive
29 //tasks for a while
30 Thread.sleep(1500);
31 } else {
32 Thread.sleep(500);
33 }
34 continue;
35 }
36 }
37 idleLoopCount = 0;
38 task = myTask.getTask();
39 taskid = task.getTaskID();
40 isCleanup = task.isTaskCleanupTask();
41 // reset the statistics for the task
42 FileSystem.clearStatistics();
43 TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
44 JobConf job = new JobConf(task.getJobFile());
45 if (job.getBoolean("task.memory.mgmt.enabled", false)) {
46 if (srcPidPath == null) {
47 srcPidPath = new Path(task.getPidFile());
48 }
49 //since the JVM is running multiple tasks potentially, we need
50 //to do symlink stuff only for the subsequent tasks
51 if (!taskid.equals(firstTaskid)) {
52 dstPidPath = new Path(task.getPidFile());
53 FileUtil.symLink(srcPidPath.toUri().getPath(),
54 dstPidPath.toUri().getPath());
55 }
56 }
57 //setupWorkDir actually sets up the symlinks for the distributed
58 //cache. After a task exits we wipe the workdir clean, and hence
59 //the symlinks have to be rebuilt.
60 TaskRunner.setupWorkDir(job);
61
62 numTasksToExecute = job.getNumTasksToExecutePerJvm();
63 assert(numTasksToExecute != 0);
64 TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
65 task.setConf(job);
66 defaultConf.addResource(new Path(task.getJobFile()));
67 // use job-specified working directory
68 FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
69 try {
70 //执行任务
71 task.run(job, umbilical); // run the task
72 } finally {
73 TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
74 if (!taskid.equals(firstTaskid) &&
75 job.getBoolean("task.memory.mgmt.enabled", false)) {
76 // delete the pid-file's symlink
77 new File(dstPidPath.toUri().getPath()).delete();
78 }
79 }
80 if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
81 break;
82 }
83 }
84} catch (FSError e) {
85 LOG.fatal("FSError from child", e);
86 umbilical.fsError(taskid, e.getMessage());
87} catch (Throwable throwable) {
88 LOG.warn("Error running child", throwable);
89 try {
90 if (task != null) {
91 // do cleanup for the task
92 task.taskCleanup(umbilical);
93 }
94 } catch (Throwable th) {
95 LOG.info("Error cleaning up" + th);
96 }
97 // Report back any failures, for diagnostic purposes
98 ByteArrayOutputStream baos = new ByteArrayOutputStream();
99 throwable.printStackTrace(new PrintStream(baos));
100 if (taskid != null) {
101 umbilical.reportDiagnosticInfo(taskid, baos.toString());
102 }
103} finally {
104 RPC.stopProxy(umbilical);
105
106}
107 }
- TaskTracker 的getTask方法。TaskTracker实现了TaskUmbilicalProtocol接扣。getTask是该接口定义的一个方法。是子进程Child调用的根据jvmId获取task。
1public synchronized JvmTask getTask(JVMId jvmId)
2 throws IOException {
3 TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
4 if (tip == null) {
5 return new JvmTask(null, false);
6 }
7 if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
8 LOG.info("JVM with ID: " + jvmId + " given task: " +
9 tip.getTask().getTaskID());
10 return new JvmTask(tip.getTask(), false);
11 } else {
12 LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " +
13 tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
14 return new JvmTask(null, true);
15}
3.Task 的run方法。因为map和reduce的执行逻辑大不相同,先看下MapTask中该方法的实现。是真正执行用户定义的map或者reduce任务的入 口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的 不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的JobTracker的 heartbeat方法处的详细解释。
1public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
2 throws IOException, ClassNotFoundException, InterruptedException {
3
4 // 开启线程向TaskTracker上报进度
5 TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
6 reporter.startCommunicationThread();
7 boolean useNewApi = job.getUseNewMapper();
8 initialize(job, getJobID(), reporter, useNewApi);
9
10 // 根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup
11 if (jobCleanup) {
12 runJobCleanupTask(umbilical, reporter);
13 return;
14 }
15 if (jobSetup) {
16 runJobSetupTask(umbilical, reporter);
17 return;
18 }
19 if (taskCleanup) {
20 runTaskCleanupTask(umbilical, reporter);
21 return;
22 }
23
24 if (useNewApi) {
25 runNewMapper(job, split, umbilical, reporter);
26 } else {
27 runOldMapper(job, split, umbilical, reporter);
28 }
29 done(umbilical, reporter);
30 }
- TaskReporter的run方法。定时向父进程TaskTracker上报状态和进度。
1public void run() {
2 final int MAX_RETRIES = 3;
3 int remainingRetries = MAX_RETRIES;
4 // get current flag value and reset it as well
5 boolean sendProgress = resetProgressFlag();
6 while (!taskDone.get()) {
7 try {
8 boolean taskFound = true; // whether TT knows about this task
9 // sleep for a bit
10 try {
11 Thread.sleep(PROGRESS_INTERVAL);
12 }
13 break;
14 }
15
16 if (sendProgress) {
17 // we need to send progress update
18 updateCounters();
19 taskStatus.statusUpdate(taskProgress.get(),
20 taskProgress.toString(),
21 counters);
22 taskFound = umbilical.statusUpdate(taskId, taskStatus);
23 taskStatus.clearStatus();
24 }
25 else {
26 // send ping
27 taskFound = umbilical.ping(taskId);
28 }
29
30 // if Task Tracker is not aware of our task ID (probably because it died and
31 // came back up), kill ourselves
32 if (!taskFound) {
33 LOG.warn("Parent died. Exiting "+taskId);
34 System.exit(66);
35 }
36
37 sendProgress = resetProgressFlag();
38 remainingRetries = MAX_RETRIES;
39 }
40 catch (Throwable t) {
41 }
42 }
43 }
44 }
- Task 的Initialize方法初始化后续要执行的几个重要变量。包括JobContext OutputFormat OutputCommitter等,这些都是后续执行中要用到的属性实例。
1public void initialize(JobConf job, JobID id,
2 Reporter reporter,
3 boolean useNewApi) throws IOException,
4 ClassNotFoundException,
5 InterruptedException {
6 jobContext = new JobContext(job, id, reporter);
7 taskContext = new TaskAttemptContext(job, taskId, reporter);
8 if (getState() == TaskStatus.State.UNASSIGNED) {
9 setState(TaskStatus.State.RUNNING);
10 }
11 if (useNewApi) {
12 LOG.debug("using new api for output committer");
13 outputFormat =
14 ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
15 committer = outputFormat.getOutputCommitter(taskContext);
16 } else {
17 committer = conf.getOutputCommitter();
18 }
19 Path outputPath = FileOutputFormat.getOutputPath(conf);
20 if (outputPath != null) {
21 if ((committer instanceof FileOutputCommitter)) {
22 FileOutputFormat.setWorkOutputPath(conf,
23 ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
24 } else {
25 FileOutputFormat.setWorkOutputPath(conf, outputPath);
26 }
27 }
28 committer.setupTask(taskContext);
29 }
- Task的 runJobCleanupTask方法。即如果在Task是jobCleanup,则调用OutputCommitter删除输出文件
1protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
2 TaskReporter reporter
3 ) throws IOException, InterruptedException {
4 // set phase for this task
5 setPhase(TaskStatus.Phase.CLEANUP);
6 getProgress().setStatus("cleanup");
7 statusUpdate(umbilical);
8 // do the cleanup
9 committer.cleanupJob(jobContext);
10 done(umbilical, reporter);
11 }
7.Task的runJobSetupTask。如果Task是setupTask,则调用OutputCommitter,如创建Task要执行的根目录。
1protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
2 TaskReporter reporter
3 ) throws IOException, InterruptedException {
4 // do the setup
5 getProgress().setStatus("setup");
6 committer.setupJob(jobContext);
7 done(umbilical, reporter);
8 }
- Task的runTaskCleanupTask。如果Task是taskCleanup,则调用taskCleanup 方法。最终OutputCommitter方法删除task的工作目录。
1protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
2 TaskReporter reporter)
3 throws IOException, InterruptedException {
4 taskCleanup(umbilical);
5 done(umbilical, reporter);
6 }
9.MapTask的runNewMapper方法是我们要重点关注的方法,是真正执行用户定义的map的方法。
1)根据传入的jobconf构造一个context,包含了job相关的所有配置信息,如后面用到的mapper、inputformat等。
2)根据配置的mapper类创建一个Mapper实例
3)根据配置的inputformat创建一个InputFormat实例。
4)重新够构建InputSplit
5)创建RecordReader,其实使用的是适配器模式适配了inputFormat的Reader。
6)构造输出RecordWriter。当没有Reducer时,output是配置的outputFormat的RecordWriter,即直接写输出。如果ruducer数量不为0,则构造一个NewOutputCollector
7)构造Mapper.Context,封装了刚才配置的所有信息,在map执行时候时候使用。
8)调用mapper的run方法来执行map动作。
1@SuppressWarnings("unchecked")
2 private <INKEY,INVALUE,OUTKEY,OUTVALUE>
3 void runNewMapper(final JobConf job,
4 final BytesWritable rawSplit,
5 final TaskUmbilicalProtocol umbilical,
6 TaskReporter reporter
7 ) throws IOException, ClassNotFoundException,
8 InterruptedException {
9 // 1. 根据传入的jobconf构造一个context,包含了job相关的所有配置信息,如后面用到的mapper、inputformat等。
10 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
11 new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
12 // 2. 根据配置的mapper类创建一个Mapper实例
13 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
14 (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
15 ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
16 // 根据配置的input format创建一个InputFormat实例。
17 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
18 (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
19 ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
20 // 4.重新够构建InputSplit
21 org.apache.hadoop.mapreduce.InputSplit split = null;
22 DataInputBuffer splitBuffer = new DataInputBuffer();
23 splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
24 SerializationFactory factory = new SerializationFactory(job);
25 Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>
26 deserializer =
27 (Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>)
28 factory.getDeserializer(job.getClassByName(splitClass));
29 deserializer.open(splitBuffer);
30 split = deserializer.deserialize(null);
31
32 //5. 创建RecordReader,其实使用的是适配器模式适配了inputFormat的Reader。
33 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
34 new NewTrackingRecordReader<INKEY,INVALUE>
35 (inputFormat.createRecordReader(split, taskContext), reporter);
36
37 job.setBoolean("mapred.skip.on", isSkipping());
38 org.apache.hadoop.mapreduce.RecordWriter output = null;
39 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
40 mapperContext = null;
41 try {
42 Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
43 org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
44 (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
45 Configuration.class,
46 org.apache.hadoop.mapreduce.TaskAttemptID.class,
47 org.apache.hadoop.mapreduce.RecordReader.class,
48 org.apache.hadoop.mapreduce.RecordWriter.class,
49 org.apache.hadoop.mapreduce.OutputCommitter.class,
50 org.apache.hadoop.mapreduce.StatusReporter.class,
51 org.apache.hadoop.mapreduce.InputSplit.class});
52
53 //6. 构造输出RecordWriter。当没有Reducer时,output是配置的outputFormat的RecordWriter,即直接写输出。如果ruducer数量不为0,则构造一个NewOutputCollector
54 if (job.getNumReduceTasks() == 0) {
55 output = outputFormat.getRecordWriter(taskContext);
56 } else {
57 output = new NewOutputCollector(job, umbilical, reporter);
58 }
59
60 //7.构造Mapper.Context,封装了刚才配置的所有信息,在map执行时候时候使用。
61 mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
62 input, output, committer,
63 reporter, split);
64
65 input.initialize(split, mapperContext);
66 //8. 调用mapper的run方法来执行map动作。
67 mapper.run(mapperContext);
68 input.close();
69 output.close(mapperContext);
70 } catch (NoSuchMethodException e) {
71 throw new IOException("Can't find Context constructor", e);
72 } catch (InstantiationException e) {
73 throw new IOException("Can't create Context", e);
74 } catch (InvocationTargetException e) {
75 throw new IOException("Can't invoke Context constructor", e);
76 } catch (IllegalAccessException e) {
77 throw new IOException("Can't invoke Context constructor", e);
78 }
79 }
10.Mapper的run方法。即对每一个输入的记录执行map方法。一般不会改变,就是拿出输入记录逐条执行map方法。除非要改变记录的执行方式,(如MultithreadedMapper需要多线程来执行),一般该方法不用override。
1public void run(Context context) throws IOException, InterruptedException {
2 setup(context);
3 while (context.nextKeyValue()) {
4 map(context.getCurrentKey(), context.getCurrentValue(), context);
5 }
6 cleanup(context);
7 }
8}
11.Mapper的map方法。即对每一个输入的记录执行map方法。这个只是默然的map执行方法,把输入不变的输出即可。用户定义的mapper就是override这个方法来按照自己定义的逻辑来处理数据。
1protected void map(KEYIN key, VALUEIN value,
2 Context context) throws IOException, InterruptedException {
3 context.write((KEYOUT) key, (VALUEOUT) value);
4 }
完。