`
yu06206
  • 浏览: 110112 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

MapReduce运行流程源码分析(二)

阅读更多


这篇博客是接着昨天分析MapReduce的流程继续进行分析的:

4.JobTracker接收Heartbeat并向TaskTracker分配任务

上一步中TaskTracker调用transmitHeartBeat方法发送Heartbeat给JobTracker,当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean ,initialContact, booleanacceptNewTasks, short responseId)函数被调用。

(1)我们看一下JobTracker类的heartbeat方法

 

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                                                  boolean restarted,
                                                  boolean initialContact,
                                                  boolean acceptNewTasks, 
                                                  short responseId) 
    throws IOException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
                " (restarted: " + restarted + 
                " initialContact: " + initialContact + 
                " acceptNewTasks: " + acceptNewTasks + ")" +
                " with responseId: " + responseId);
    }

    // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
    if (!acceptTaskTracker(status)) {
      throw new DisallowedTaskTrackerException(status);
    }
    ....
    //初始化一个HeartbeatResponse对象
    HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);

    if (initialContact != true) {
      // If this isn't the 'initial contact' from the tasktracker,
      // there is something seriously wrong if the JobTracker has
      // no record of the 'previous heartbeat'; if so, ask the 
      // tasktracker to re-initialize itself.
      if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly 
        // started JobTracker
        
        // Jobtracker might have restarted but no recovery is needed
        // otherwise this code should not be reached
        LOG.warn("Serious problem, cannot find record of 'previous' " +
                 "heartbeat for '" + trackerName + 
                 "'; reinitializing the tasktracker");
        return new HeartbeatResponse(responseId, 
            new TaskTrackerAction[] {new ReinitTrackerAction()});
      
      } else {
                
        // It is completely safe to not process a 'duplicate' heartbeat from a 
        // {@link TaskTracker} since it resends the heartbeat when rpcs are 
        // lost see {@link TaskTracker.transmitHeartbeat()};
        // acknowledge it by re-sending the previous response to let the 
        // {@link TaskTracker} go forward. 
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info("Ignoring 'duplicate' heartbeat from '" + 
              trackerName + "'; resending the previous 'lost' response");
          return prevHeartbeatResponse;
        }
      }
    }
      
    // Process this heartbeat 
    short newResponseId = (short)(responseId + 1);
    status.setLastSeen(now);
    if (!processHeartbeat(status, initialContact)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId, 
                   new TaskTrackerAction[] {new ReinitTrackerAction()});
    }
      
    // Initialize the response to be sent for the heartbeat
    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
    isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
    // Check for new tasks to be executed on the tasktracker
  //如果TaskTracker向JobTracker请求一个task运行
    if (acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
	 //setup和cleanup的task的优先次序最高
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null ) {
	//taskScheduler.assignTasks方法注册一个task
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
        if (tasks != null) {
          for (Task task : tasks) {
  	//将任务放入actions列表,返回给TaskTracker
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if (LOG.isDebugEnabled()) {
              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }
    ....   
    return response;
  }

 

 

通过调用上面的方法其中实现了task在taskScheduler的注册,JobQueueTaskScheduler是JobTracker默认的Task调度器,上面方法中taskScheduler.assignTasks(),注册一个Task。

(2)下一步我们看一下JobQueueTaskScheduler类的assignTasks方法

 

public synchronized List<Task> assignTasks(TaskTracker taskTracker)
      throws IOException {
    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    final int numTaskTrackers = clusterStatus.getTaskTrackers();
    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();
    ....
    //
    // Compute (running + pending) map and reduce task numbers across pool
    // 计算剩余的map和reduce的工作量:remaining
    int remainingReduceLoad = 0;
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
          if (job.scheduleReduces()) {
            remainingReduceLoad += 
              (job.desiredReduces() - job.finishedReduces());
          }
        }
      }
    }

    // Compute the 'load factor' for maps and reduces
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }
        
    
    final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
    boolean exceededMapPadding = false;
    if (availableMapSlots > 0) {
      exceededMapPadding = 
        exceededPadding(true, clusterStatus, trackerMapCapacity);
    }
    //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以

TaskTracker的个数。
    int numLocalMaps = 0;
    int numNonLocalMaps = 0;
    scheduleMaps:
    for (int i=0; i < availableMapSlots; ++i) {
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }

          Task t = null;
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                      taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            
            // Don't assign map tasks to the hilt!
            // Leave some free slots in the cluster for future task-failures,
            // speculative tasks etc. beyond the highest priority job
            if (exceededMapPadding) {
              break scheduleMaps;
            }
           
            // Try all jobs again for the next Map task 
            break;
          }
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());
          
          if (t != null) {
            assignedTasks.add(t);
            ++numNonLocalMaps;
            
            // We assign at most 1 off-switch or speculative task
            // This is to prevent TaskTrackers from stealing local-tasks
            // from other TaskTrackers.
            break scheduleMaps;
          }
        }
      }
    }
    int assignedMaps = assignedTasks.size();
    ....
    return assignedTasks;
  }

 

 

以上的过程可能要经过一个复杂的计算,由jobTracker调度Task,从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配调度map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress.同样的道理JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。然后又JobTracker进行任务的分配,这个步骤就结束了,由于这个步骤比较简单这里就不画流程图了。

5.TaskTracker接收HeartbeatResponse并执行任务

在向JobTracker发送heartbeat后,如果返回的heartbeatreponse中含有分配好的任务LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLaunch队列。具体的怎么通过RPC接收到heartbeatreponse这里不做分析,接收到分配的任务后,调用

addToTaskQueue方法。

(1)所以我们先看一下addToTaskQueue方法

 

private void addToTaskQueue(LaunchTaskAction action) {
    if (action.getTask().isMapTask()) {
      mapLauncher.addToTaskQueue(action);
    } else {
      reduceLauncher.addToTaskQueue(action);
    }
  }

 

 

在此,MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列。

(2)不管是map task还是reduce task都要调用TaskLauncher中的addToTaskQueue方法

 

public void addToTaskQueue(LaunchTaskAction action) {
      synchronized (tasksToLaunch) {
        TaskInProgress tip = registerTask(action, this);
        tasksToLaunch.add(tip);
        tasksToLaunch.notifyAll();
      }
    }

 

 

(3)然后继续registerTask方法

 

 private TaskInProgress registerTask(LaunchTaskAction action, 
      TaskLauncher launcher) {
  //从action中获取Task对象
    Task t = action.getTask();
    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
             " task's state:" + t.getState());
    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
    synchronized (this) {
   //在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对象,以通知程序其他部分该

任务的建立
      tasks.put(t.getTaskID(), tip);
      runningTasks.put(t.getTaskID(), tip);
      boolean isMap = t.isMapTask();
      if (isMap) {
        mapTotal++;
      } else {
        reduceTotal++;
      }
    }
    return tip;
  }

 

 

同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的startNewTask(TaskInProgress tip)来启动一个task。

(4)我们看一下启动的run方法

 

 public void run() {
      while (!Thread.interrupted()) {
        try {
          TaskInProgress tip;
          Task task;
          synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();
            }
           .....
          synchronized (tip) {
            //to make sure that there is no kill task action for this
            if (!tip.canBeLaunched()) {
              //got killed externally while still in the launcher queue
              LOG.info("Not launching task " + task.getTaskID() + " as it got"
                + " killed externally. Task's state is " + tip.getRunState());
              addFreeSlots(task.getNumSlotsRequired());
              continue;
            }
            tip.slotTaken = true;
          }
          //got a free slot. launch the task
          //启动一个新的Task
          startNewTask(tip);
        } catch (InterruptedException e) { 
          return; // ALL DONE
        } catch (Throwable th) {
          LOG.error("TaskLauncher error " + 
              StringUtils.stringifyException(th));
        }
      }

 而startNewTask方法主要是调用了localizeJob(tip)方法实现本地化,完成从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar以及Task运行的必须的文件,这个过程属于非常重要的部分

 

(5)看一下重要的localizeJob方法

 

 RunningJob localizeJob(TaskInProgress tip
                           ) throws IOException, InterruptedException {
    Task t = tip.getTask();
    JobID jobId = t.getJobID();
    RunningJob rjob = addTaskToJob(jobId, tip);

    // Initialize the user directories if needed.
    //初始化用户文件目录
    getLocalizer().initializeUserDirs(t.getUser());

    synchronized (rjob) {
      if (!rjob.localized) {
       //初始化本地配置文件
        JobConf localJobConf = localizeJobFiles(t, rjob);
        // 初始化日志目录
        initializeJobLogDir(jobId, localJobConf);

        // Now initialize the job via task-controller so as to set
        // ownership/permissions of jars, job-work-dir. Note that initializeJob
        // should be the last call after every other directory/file to be
        // directly under the job directory is created.
	
        JobInitializationContext context = new JobInitializationContext();
        context.jobid = jobId;
        context.user = t.getUser();
        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
        taskController.initializeJob(context);

        rjob.jobConf = localJobConf;
        rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                             localJobConf.getKeepFailedTaskFiles());
        rjob.localized = true;
      }
    }
    return rjob;
  }

 

 

(6)然后进行了一系列本地化的操作,这个步骤比较繁琐,我们简单看一下几个方法

 

PathlocalJobFile = lDirAlloc.getLocalPathForWrite(

                  getLocalJobDir(jobId.toString())

                 + Path.SEPARATOR + "job.xml",

                  jobFileSize, fConf);

 RunningJob rjob = addTaskToJob(jobId, tip);

 synchronized (rjob) {

    if(!rjob.localized) {

     FileSystem localFs = FileSystem.getLocal(fConf);

      PathjobDir = localJobFile.getParent();

      ……

      //将job.split拷贝到本地

     systemFS.copyToLocalFile(jobFile, localJobFile);

     JobConf localJobConf = new JobConf(localJobFile);

      PathworkDir = lDirAlloc.getLocalPathForWrite(

                      (getLocalJobDir(jobId.toString())

                       + Path.SEPARATOR +"work"), fConf);

      if(!localFs.mkdirs(workDir)) {

       throw new IOException("Mkdirs failed to create "

                    + workDir.toString());

      }

     System.setProperty("job.local.dir", workDir.toString());

     localJobConf.set("job.local.dir", workDir.toString());

      //copy Jar file to the local FS and unjar it.

     String jarFile = localJobConf.getJar();

      longjarFileSize = -1;

      if(jarFile != null) {

       Path jarFilePath = new Path(jarFile);

       localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

                                  getLocalJobDir(jobId.toString())

                                   +Path.SEPARATOR + "jars",

                                   5 *jarFileSize, fConf), "job.jar");

        if(!localFs.mkdirs(localJarFile.getParent())) {

         throw new IOException("Mkdirs failed to create jars directory");

        }

        //将job.jar拷贝到本地

       systemFS.copyToLocalFile(jarFilePath, localJarFile);

       localJobConf.setJar(localJarFile.toString());

       //将job得configuration写成job.xml

       OutputStream out = localFs.create(localJobFile);

        try{

         localJobConf.writeXml(out);

        }finally {

         out.close();

        }

        // 解压缩job.jar

       RunJar.unJar(new File(localJarFile.toString()),

                     newFile(localJarFile.getParent().toString()));

      }

     rjob.localized = true;

     rjob.jobConf = localJobConf;

    }

  }

  //真正的启动此Task

 launchTaskForJob(tip, new JobConf(rjob.jobConf));

}

当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数

 

public synchronized void launchTask() throwsIOException {

    ……

    //创建task运行目录

   localizeTask(task);

    if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

     this.taskStatus.setRunState(TaskStatus.State.RUNNING);

    }

    //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

   this.runner = task.createRunner(TaskTracker.this, this);

   this.runner.start();

   this.taskStatus.setStartTime(System.currentTimeMillis());

}

TaskRunner是抽象类,是Thread类的子类,其run函数如下:

public final void run() {

    ……

   TaskAttemptID taskid = t.getTaskID();

   LocalDirAllocator lDirAlloc = newLocalDirAllocator("mapred.local.dir");

    FilejobCacheDir = null;

    if(conf.getJar() != null) {

     jobCacheDir = new File(

                        newPath(conf.getJar()).getParent().toString());

    }

    File workDir = newFile(lDirAlloc.getLocalPathToRead(

                             TaskTracker.getLocalTaskDir(

                               t.getJobID().toString(),

                               t.getTaskID().toString(),

                                t.isTaskCleanupTask())

           + Path.SEPARATOR + MRConstants.WORKDIR,

                              conf).toString());

   FileSystem fileSystem;

    PathlocalPath;

    ……

    //拼写classpath

    StringbaseDir;

    Stringsep = System.getProperty("path.separator");

   StringBuffer classPath = new StringBuffer();

    //start with same classpath as parent process

   classPath.append(System.getProperty("java.class.path"));

   classPath.append(sep);

    if(!workDir.mkdirs()) {

      if(!workDir.isDirectory()) {

       LOG.fatal("Mkdirs failed to create " + workDir.toString());

      }

    }

    Stringjar = conf.getJar();

    if (jar!= null) {     

      // ifjar exists, it into workDir

     File[] libs = new File(jobCacheDir, "lib").listFiles();

      if(libs != null) {

        for(int i = 0; i < libs.length; i++) {

         classPath.append(sep);         //add libs from jar to classpath

         classPath.append(libs[i]);

        }

      }

     classPath.append(sep);

     classPath.append(new File(jobCacheDir, "classes"));

     classPath.append(sep);

     classPath.append(jobCacheDir);

    }

    ……

   classPath.append(sep);

   classPath.append(workDir);

    //拼写命令行java及其参数

   Vector<String> vargs = new Vector<String>(8);

    Filejvm =

      newFile(new File(System.getProperty("java.home"), "bin"),"java");

   vargs.add(jvm.toString());

    StringjavaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

   javaOpts = javaOpts.replace("@taskid@", taskid.toString());

    String[] javaOptsSplit = javaOpts.split(" ");

    StringlibraryPath = System.getProperty("java.library.path");

    if(libraryPath == null) {

     libraryPath = workDir.getAbsolutePath();

    } else{

     libraryPath += sep + workDir;

    }

    booleanhasUserLDPath = false;

    for(inti=0; i<javaOptsSplit.length ;i++) {

     if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

       javaOptsSplit[i] += sep + libraryPath;

        hasUserLDPath = true;

       break;

      }

    }

   if(!hasUserLDPath) {

     vargs.add("-Djava.library.path=" + libraryPath);

    }

    for(int i = 0; i < javaOptsSplit.length; i++) {

     vargs.add(javaOptsSplit[i]);

    }

    //添加Child进程的临时文件夹

    Stringtmp = conf.get("mapred.child.tmp", "./tmp");

    PathtmpDir = new Path(tmp);

    if(!tmpDir.isAbsolute()) {

     tmpDir = new Path(workDir.toString(), tmp);

    }

   FileSystem localFs = FileSystem.getLocal(conf);

    if(!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

      thrownew IOException("Mkdirs failed to create " + tmpDir.toString());

    }

   vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

    // Addclasspath.

   vargs.add("-classpath");

   vargs.add(classPath.toString());

    //log文件夹

    longlogSize = TaskLog.getTaskLogLength(conf);

   vargs.add("-Dhadoop.log.dir=" +

        newFile(System.getProperty("hadoop.log.dir")

       ).getAbsolutePath());

   vargs.add("-Dhadoop.root.logger=INFO,TLA");

   vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

   vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

    // 运行map task和reduce task的子进程的main class是Child

   vargs.add(Child.class.getName()); // main of Child

 最后贴一张这个步骤的流程图,后续流程明天继续分析!!!

 

  • 大小: 37.3 KB
分享到:
评论

相关推荐

    MapReduce 2.0源码分析与编程实

    全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...

    MapReduce2.0源码分析与实战编程

    全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...

    Hadoop从入门到上手企业开发

    059 MR作业运行流程整体分析 060 MapReduce执行流程之Shuffle和排序流程以及Map端分析 061 MapReduce执行流程之Reduce端分析 062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 ...

    MapReduceV1:TaskTracker端启动Task流程分析

    我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker...

    hadoop段海涛老师八天实战视频

    09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-...

    MapReduceV1:JobTracker端Job/Task数据结构

    我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。在编写...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    09-hdfs下载数据源码分析-getFileSystem.avi 10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-...

    基于Java电商评论数据的分析与可视化系统源码+部署说明.zip

    基于Java电商评论数据的分析与可视化系统源码+部署说明.zip 1、该资源内项目代码都是经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化...

    Nutch入门.rar

    它提供了我们运行自己的搜索引擎所需的全部工具。 目 录 1. nutch简介...1 1.1什么是nutch..1 1.2研究nutch的原因...1 1.3 nutch的目标..1 1.4 nutch VS lucene.....2 2. nutch的安装与配置.....3 2.1 JDK...

    nutch 初学文档教材

    5. nutch工作流程分析...25 5.1 爬虫...25 5.1.1 工作策略...25 5.1.2 工作流程分析....25 5.1.3 其它..27 5.2 索引...27 5.2.1 索引主要过程....27 5.2.2 工作流程分析....28 5.2.3 倒排索引(inverted index)....29...

Global site tag (gtag.js) - Google Analytics