- 浏览: 110112 次
- 性别:
- 来自: 长沙
文章分类
最新评论
-
chenglnb:
非常好,很强大,谢谢lz 帮解决了我的问题
MapReduce提交作业常见问题 -
Karl-z:
在ClassNotFoundExceptiond第二个解决办法 ...
MapReduce提交作业常见问题 -
blackproof:
编码问题如何解决
HDFS的文件操作 -
napolengogo:
远程访问的时候,client是非集群里的机器,就会存在权限问题 ...
HDFS的文件操作 -
jianggege:
每一个程序员都懂得代码!
有多少人懂我?
这篇博客是接着昨天分析MapReduce的流程继续进行分析的:
4.JobTracker接收Heartbeat并向TaskTracker分配任务
上一步中TaskTracker调用transmitHeartBeat方法发送Heartbeat给JobTracker,当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean ,initialContact, booleanacceptNewTasks, short responseId)函数被调用。
(1)我们看一下JobTracker类的heartbeat方法
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (restarted: " + restarted +
" initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
}
// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
throw new DisallowedTaskTrackerException(status);
}
....
//初始化一个HeartbeatResponse对象
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
} else {
// It is completely safe to not process a 'duplicate' heartbeat from a
// {@link TaskTracker} since it resends the heartbeat when rpcs are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info("Ignoring 'duplicate' heartbeat from '" +
trackerName + "'; resending the previous 'lost' response");
return prevHeartbeatResponse;
}
}
}
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
if (!processHeartbeat(status, initialContact)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
//如果TaskTracker向JobTracker请求一个task运行
if (acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
//setup和cleanup的task的优先次序最高
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
//taskScheduler.assignTasks方法注册一个task
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if (tasks != null) {
for (Task task : tasks) {
//将任务放入actions列表,返回给TaskTracker
expireLaunchingTasks.addNewTask(task.getTaskID());
if (LOG.isDebugEnabled()) {
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
}
}
....
return response;
}
通过调用上面的方法其中实现了task在taskScheduler的注册,JobQueueTaskScheduler是JobTracker默认的Task调度器,上面方法中taskScheduler.assignTasks(),注册一个Task。
(2)下一步我们看一下JobQueueTaskScheduler类的assignTasks方法
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
....
//
// Compute (running + pending) map and reduce task numbers across pool
// 计算剩余的map和reduce的工作量:remaining
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
if (job.scheduleReduces()) {
remainingReduceLoad +=
(job.desiredReduces() - job.finishedReduces());
}
}
}
}
// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}
final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
}
//计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以
TaskTracker的个数。
int numLocalMaps = 0;
int numNonLocalMaps = 0;
scheduleMaps:
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = null;
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;
// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
}
// Try all jobs again for the next Map task
break;
}
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;
// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
}
}
}
}
int assignedMaps = assignedTasks.size();
....
return assignedTasks;
}
以上的过程可能要经过一个复杂的计算,由jobTracker调度Task,从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配调度map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress.同样的道理JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。然后又JobTracker进行任务的分配,这个步骤就结束了,由于这个步骤比较简单这里就不画流程图了。
5.TaskTracker接收HeartbeatResponse并执行任务
在向JobTracker发送heartbeat后,如果返回的heartbeatreponse中含有分配好的任务LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLaunch队列。具体的怎么通过RPC接收到heartbeatreponse这里不做分析,接收到分配的任务后,调用
addToTaskQueue方法。
(1)所以我们先看一下addToTaskQueue方法
private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}
在此,MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列。
(2)不管是map task还是reduce task都要调用TaskLauncher中的addToTaskQueue方法
public void addToTaskQueue(LaunchTaskAction action) {
synchronized (tasksToLaunch) {
TaskInProgress tip = registerTask(action, this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
}
(3)然后继续registerTask方法
private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
//从action中获取Task对象
Task t = action.getTask();
LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
" task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
//在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对象,以通知程序其他部分该
任务的建立
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
boolean isMap = t.isMapTask();
if (isMap) {
mapTotal++;
} else {
reduceTotal++;
}
}
return tip;
}
同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的startNewTask(TaskInProgress tip)来启动一个task。
(4)我们看一下启动的run方法
public void run() {
while (!Thread.interrupted()) {
try {
TaskInProgress tip;
Task task;
synchronized (tasksToLaunch) {
while (tasksToLaunch.isEmpty()) {
tasksToLaunch.wait();
}
.....
synchronized (tip) {
//to make sure that there is no kill task action for this
if (!tip.canBeLaunched()) {
//got killed externally while still in the launcher queue
LOG.info("Not launching task " + task.getTaskID() + " as it got"
+ " killed externally. Task's state is " + tip.getRunState());
addFreeSlots(task.getNumSlotsRequired());
continue;
}
tip.slotTaken = true;
}
//got a free slot. launch the task
//启动一个新的Task
startNewTask(tip);
} catch (InterruptedException e) {
return; // ALL DONE
} catch (Throwable th) {
LOG.error("TaskLauncher error " +
StringUtils.stringifyException(th));
}
}
而startNewTask方法主要是调用了localizeJob(tip)方法实现本地化,完成从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar以及Task运行的必须的文件,这个过程属于非常重要的部分
(5)看一下重要的localizeJob方法
RunningJob localizeJob(TaskInProgress tip
) throws IOException, InterruptedException {
Task t = tip.getTask();
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
// Initialize the user directories if needed.
//初始化用户文件目录
getLocalizer().initializeUserDirs(t.getUser());
synchronized (rjob) {
if (!rjob.localized) {
//初始化本地配置文件
JobConf localJobConf = localizeJobFiles(t, rjob);
// 初始化日志目录
initializeJobLogDir(jobId, localJobConf);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
// should be the last call after every other directory/file to be
// directly under the job directory is created.
JobInitializationContext context = new JobInitializationContext();
context.jobid = jobId;
context.user = t.getUser();
context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
taskController.initializeJob(context);
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
}
}
return rjob;
}
(6)然后进行了一系列本地化的操作,这个步骤比较繁琐,我们简单看一下几个方法
PathlocalJobFile = lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if(!rjob.localized) {
FileSystem localFs = FileSystem.getLocal(fConf);
PathjobDir = localJobFile.getParent();
……
//将job.split拷贝到本地
systemFS.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
PathworkDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
+ Path.SEPARATOR +"work"), fConf);
if(!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());
//copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
longjarFileSize = -1;
if(jarFile != null) {
Path jarFilePath = new Path(jarFile);
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+Path.SEPARATOR + "jars",
5 *jarFileSize, fConf), "job.jar");
if(!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory");
}
//将job.jar拷贝到本地
systemFS.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
//将job得configuration写成job.xml
OutputStream out = localFs.create(localJobFile);
try{
localJobConf.writeXml(out);
}finally {
out.close();
}
// 解压缩job.jar
RunJar.unJar(new File(localJarFile.toString()),
newFile(localJarFile.getParent().toString()));
}
rjob.localized = true;
rjob.jobConf = localJobConf;
}
}
//真正的启动此Task
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数
public synchronized void launchTask() throwsIOException {
……
//创建task运行目录
localizeTask(task);
if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
//创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
}
TaskRunner是抽象类,是Thread类的子类,其run函数如下:
public final void run() {
……
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = newLocalDirAllocator("mapred.local.dir");
FilejobCacheDir = null;
if(conf.getJar() != null) {
jobCacheDir = new File(
newPath(conf.getJar()).getParent().toString());
}
File workDir = newFile(lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalTaskDir(
t.getJobID().toString(),
t.getTaskID().toString(),
t.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf).toString());
FileSystem fileSystem;
PathlocalPath;
……
//拼写classpath
StringbaseDir;
Stringsep = System.getProperty("path.separator");
StringBuffer classPath = new StringBuffer();
//start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
if(!workDir.mkdirs()) {
if(!workDir.isDirectory()) {
LOG.fatal("Mkdirs failed to create " + workDir.toString());
}
}
Stringjar = conf.getJar();
if (jar!= null) {
// ifjar exists, it into workDir
File[] libs = new File(jobCacheDir, "lib").listFiles();
if(libs != null) {
for(int i = 0; i < libs.length; i++) {
classPath.append(sep); //add libs from jar to classpath
classPath.append(libs[i]);
}
}
classPath.append(sep);
classPath.append(new File(jobCacheDir, "classes"));
classPath.append(sep);
classPath.append(jobCacheDir);
}
……
classPath.append(sep);
classPath.append(workDir);
//拼写命令行java及其参数
Vector<String> vargs = new Vector<String>(8);
Filejvm =
newFile(new File(System.getProperty("java.home"), "bin"),"java");
vargs.add(jvm.toString());
StringjavaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", taskid.toString());
String[] javaOptsSplit = javaOpts.split(" ");
StringlibraryPath = System.getProperty("java.library.path");
if(libraryPath == null) {
libraryPath = workDir.getAbsolutePath();
} else{
libraryPath += sep + workDir;
}
booleanhasUserLDPath = false;
for(inti=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
javaOptsSplit[i] += sep + libraryPath;
hasUserLDPath = true;
break;
}
}
if(!hasUserLDPath) {
vargs.add("-Djava.library.path=" + libraryPath);
}
for(int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
//添加Child进程的临时文件夹
Stringtmp = conf.get("mapred.child.tmp", "./tmp");
PathtmpDir = new Path(tmp);
if(!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
}
FileSystem localFs = FileSystem.getLocal(conf);
if(!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
thrownew IOException("Mkdirs failed to create " + tmpDir.toString());
}
vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
// Addclasspath.
vargs.add("-classpath");
vargs.add(classPath.toString());
//log文件夹
longlogSize = TaskLog.getTaskLogLength(conf);
vargs.add("-Dhadoop.log.dir=" +
newFile(System.getProperty("hadoop.log.dir")
).getAbsolutePath());
vargs.add("-Dhadoop.root.logger=INFO,TLA");
vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
// 运行map task和reduce task的子进程的main class是Child
vargs.add(Child.class.getName()); // main of Child
最后贴一张这个步骤的流程图,后续流程明天继续分析!!!
发表评论
-
MapReduce提交作业常见问题
2012-02-10 15:42 40022今天在hadoop集群上跑MapReduce程序,遇到的一些 ... -
MapReduce运行流程源码分析(一)
2012-02-08 13:39 2173这几天都会看一些hadoop的源代码,开始的时候总会没有 ... -
hadoop环境配置——(集群版)
2012-02-08 09:23 1735这个寒假我们根据自己的摸索,我们克服了很多困难 ... -
HDFS的文件操作
2012-02-07 11:43 8983在去年寒假的时候,我们已经完成了hadoop集群的搭建, ... -
hadoop环境配置——(单机版)
2012-02-05 15:20 23811.所需的环境 ubuntu系统 2.所需要的软件包 ...
相关推荐
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...
059 MR作业运行流程整体分析 060 MapReduce执行流程之Shuffle和排序流程以及Map端分析 061 MapReduce执行流程之Reduce端分析 062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 ...
我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker...
09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-...
我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。在编写...
09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-...
基于Java电商评论数据的分析与可视化系统源码+部署说明.zip 1、该资源内项目代码都是经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化...
它提供了我们运行自己的搜索引擎所需的全部工具。 目 录 1. nutch简介...1 1.1什么是nutch..1 1.2研究nutch的原因...1 1.3 nutch的目标..1 1.4 nutch VS lucene.....2 2. nutch的安装与配置.....3 2.1 JDK...
5. nutch工作流程分析...25 5.1 爬虫...25 5.1.1 工作策略...25 5.1.2 工作流程分析....25 5.1.3 其它..27 5.2 索引...27 5.2.1 索引主要过程....27 5.2.2 工作流程分析....28 5.2.3 倒排索引(inverted index)....29...