【hadoop代码笔记】Hadoop作业提交中EagerTaskInitializationListener的作用
一、概述
继承自JobInProgressListener,实现了jobAdded,jobRemoved,jobUpdated方法。哦,不能说实现,应该说继承,JobInProgressListener居然是个抽象类,看着怎么这样的listener也应该是个interface。
在该listener被注册后,就响应jobAdded,jobRemoved,jobUpdated动作。在EagerTaskInitializationListener中,响应这三种动作来维护内部的一个job列表(List
二、主要代码逻辑
- 在job被添加到JobTracker时,注册的Lister会响应该方法。即当有作业提交到JobTracker时,该方法会把JIP加到jobInitQueue列表中,并且根据作业优先级和启动时间来调整其顺序。
- jobInitManagerThread会一直产看jobInitManagerThread列表中的job,逐一取出来初始化其task。
三、主要成员
1 private JobInitManager jobInitManager = new JobInitManager(); //一个job初始化线程,关注job队列jobInitQueue,取出进行初始化
2 private Thread jobInitManagerThread; // JobInitManager线程
3 private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>(); //响应lister的几种方法,维护的job队列
4 private ExecutorService threadPool; //一个线程池,里面的一个线程取一个job进行初始化
5 private int numThreads; //线程池的线程数,可配置
四、主要方法
1. EagerTaskInitializationListener的jobAdded方法:
首先关注的代码片段是该listener的jobAdded方法,前面说过,在FairScheduler的start方法中(taskTrackerManager.addJobInProgressListener(eagerInitListener))会把EagerTaskInitializationListener注册到JobTracker,在jobTracker中加入job的时候(addJob被调用),触发其上所有的jobListener的jobAdded方法。
在EagerTaskInitializationListener中,jobAdded只是简单的把job加入到一个List
1@Override
2 public void jobAdded(JobInProgress job) {
3 synchronized (jobInitQueue) {
4 jobInitQueue.add(job);
5 resortInitQueue();
6 jobInitQueue.notifyAll();
7 }
8
9 }
2. JobInitManager类:
一个线程,对jobInitQueue上保存的每个Job启动一个线程来执行初始化工作。在其run方法中会一直检查jobInitQueue是否有作业,有则拿出来从线程池中取一个线程处理。
1class JobInitManager implements Runnable {
2
3 public void run() {
4 JobInProgress job = null;
5 while (true) {
6 try {
7 synchronized (jobInitQueue) {
8 while (jobInitQueue.isEmpty()) {
9 jobInitQueue.wait();
10 }
11 job = jobInitQueue.remove(0);
12 }
13 threadPool.execute(new InitJob(job));
14 } catch (InterruptedException t) {
15 LOG.info("JobInitManagerThread interrupted.");
16 break;
17 }
18 }
19 LOG.info("Shutting down thread pool");
20 threadPool.shutdownNow();
21 }
22 }
3. InitJob 一个线程类定义,真正处理每一个job的初始化。其实调用的是job的初始化方法(JobInProgress initTasks)
1static class InitJob implements Runnable {
2 private JobInProgress job;
3 public InitJob(JobInProgress job) {
4 this.job = job;
5 }
6
7 public void run()
8 {
9 job.initTasks();
10 }
11 }
完。