【hadoop代码笔记】Hadoop作业提交之客户端作业提交
一、概要描述
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。
二、 流程描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。
在JobClient中作业提交的主要过程如下:
1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException 5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
三、代码详细
Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等
JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。
这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。
详细可参照JobClient中 的初始化代码:
1 /**
2 *如果是非local的就会 连接到指定的JobTracker
3 */
4 public void init(JobConf conf) throws IOException {
5 String tracker = conf.get("mapred.job.tracker", "local");
6 if ("local".equals(tracker)) {
7 this.jobSubmitClient = new LocalJobRunner(conf);
8 } else {
9 this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
10 }
11 }
12
13 /*
14 * RPC不是本次主题重点,可参照后续发表的专题内容
15 */
16 private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
17 Configuration conf) throws IOException {
18 return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
19 JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
20 NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
21 }
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。
1 public RunningJob submitJob(JobConf job) throws FileNotFoundException,
2 IOException {
3 try {
4 return submitJobInternal(job);
5 } catch (InterruptedException ie) {
6 throw new IOException("interrupted", ie);
7 } catch (ClassNotFoundException cnfe) {
8 throw new IOException("class not found", cnfe);
9 }
10 }
实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释
1public RunningJob submitJobInternal(JobConf job)
2 throws FileNotFoundException, ClassNotFoundException,
3 InterruptedException, IOException {
4
5 // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
6 JobID jobId = jobSubmitClient.getNewJobId();
7 // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
8 // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
9
10 Path submitJobDir = new Path(getSystemDir(), jobId.toString());
11 Path submitJarFile = new Path(submitJobDir, "job.jar");
12 Path submitSplitFile = new Path(submitJobDir, "job.split");
13 configureCommandLineOptions(job, submitJobDir, submitJarFile);
14 Path submitJobFile = new Path(submitJobDir, "job.xml");
15 int reduces = job.getNumReduceTasks();
16 JobContext context = new JobContext(job, jobId);
17
18 // Check the output specification
19 // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
20
21 if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
22 org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
23 .newInstance(context.getOutputFormatClass(), job);
24 output.checkOutputSpecs(context);
25 } else {
26 job.getOutputFormat().checkOutputSpecs(fs, job);
27 }
28
29 // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
30 // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
31 // job.split名称。
32 // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
33
34 // Create the splits for the job
35 LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
36 int maps;
37 if (job.getUseNewMapper()) {
38 maps = writeNewSplits(context, submitSplitFile);
39 } else {
40 maps = writeOldSplits(job, submitSplitFile);
41 }
42 job.set("mapred.job.split.file", submitSplitFile.toString());
43 job.setNumMapTasks(maps);
44
45 // Write job file to JobTracker's fs
46 FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
47 new FsPermission(JOB_FILE_PERMISSION));
48
49 try {
50 job.writeXml(out);
51 } finally {
52 out.close();
53 }
54
55 // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
56
57 JobStatus status = jobSubmitClient.submitJob(jobId);
58 if (status != null) {
59 return new NetworkedJob(status);
60 } else {
61 throw new IOException("Could not launch job");
62 }
63 }
1/**
2 * JobTracker.submitJob() kicks off a new job.
3 *
4 * Create a 'JobInProgress' object, which contains both JobProfile
5 * and JobStatus. Those two sub-objects are sometimes shipped outside
6 * of the JobTracker. But JobInProgress adds info that's useful for
7 * the JobTracker alone.
8 */
9 public synchronized JobStatus submitJob(JobID jobId) throws IOException {
10 if(jobs.containsKey(jobId)) {
11 //job already running, don't start twice
12 return jobs.get(jobId).getStatus();
13 }
14
15 JobInProgress job = new JobInProgress(jobId, this, this.conf);
16
17 String queue = job.getProfile().getQueueName();
18 if(!(queueManager.getQueues().contains(queue))) {
19 new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
20 throw new IOException("Queue \"" + queue + "\" does not exist");
21 }
22
23 // check for access
24 try {
25 checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
26 } catch (IOException ioe) {
27 LOG.warn("Access denied for user " + job.getJobConf().getUser()
28 + ". Ignoring job " + jobId, ioe);
29 new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
30 throw ioe;
31 }
32
33 return addJob(jobId, job);
34 }