From e1637a57dfd41385dbce5de90620c48a45abb263 Mon Sep 17 00:00:00 2001
From: Masatake Iwasaki
The most important difference is that unlike GFS, Hadoop DFS files +have strictly one writer at any one time. Bytes are always appended +to the end of the writer's stream. There is no notion of "record appends" +or "mutations" that are then checked or reordered. Writers simply emit +a byte stream. That byte stream is guaranteed to be stored in the +order written.
]]> + + +DistributedCache
is a facility provided by the Map-Reduce
+ framework to cache files (text, archives, jars etc.) needed by applications.
+
+
+ Applications specify the files, via urls (hdfs:// or http://) to be cached
+ via the {@link org.apache.hadoop.mapred.JobConf}. The
+ DistributedCache
assumes that the files specified via urls are
+ already present on the {@link FileSystem} at the path specified by the url
+ and are accessible by every machine in the cluster.
The framework will copy the necessary files on to the worker node before + any tasks for the job are executed on that node. Its efficiency stems from + the fact that the files are only copied once per job and the ability to + cache archives which are un-archived on the workers.
+ +DistributedCache
can be used to distribute simple, read-only
+ data/text files and/or more complex types such as archives, jars etc.
+ Archives (zip, tar and tgz/tar.gz files) are un-archived at the worker nodes.
+ Jars may be optionally added to the classpath of the tasks, a rudimentary
+ software distribution mechanism. Files have execution permissions.
+ In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ to be created in the working directory of the child task. In the current
+ version symlinks are always created. If the URL does not have a fragment
+ the name of the file or directory will be used. If multiple files or
+ directories map to the same link name, the last one added, will be used. All
+ others will not even be downloaded.
DistributedCache
tracks modification timestamps of the cache
+ files. Clearly the cache files should not be modified by the application
+ or externally while the job is executing.
Here is an illustrative example on how to use the
+ DistributedCache
:
+ + It is also very common to use the DistributedCache by using + {@link org.apache.hadoop.util.GenericOptionsParser}. + + This class includes methods that should be used by users + (specifically those mentioned in the example above, as well + as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}), + as well as methods intended for use by the MapReduce framework + (e.g., {@link org.apache.hadoop.mapred.JobClient}). + + @see org.apache.hadoop.mapred.JobConf + @see org.apache.hadoop.mapred.JobClient + @see org.apache.hadoop.mapreduce.Job]]> ++ // Setting up the cache for the application + + 1. Copy the requisite files to theFileSystem
: + + $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat + $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip + $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar + $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar + $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz + $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz + + 2. Setup the application'sJobConf
: + + JobConf job = new JobConf(); + DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), + job); + DistributedCache.addCacheArchive(new URI("/myapp/map.zip"), job); + DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz"), job); + + 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} + or {@link org.apache.hadoop.mapred.Reducer}: + + public static class MapClass extends MapReduceBase + implements Mapper<K, V, K, V> { + + private Path[] localArchives; + private Path[] localFiles; + + public void configure(JobConf job) { + // Get the cached archives/files + File f = new File("./map.zip/some/file/in/zip.txt"); + } + + public void map(K key, V value, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Use data from the cached archives/files here + // ... + // ... + output.collect(k, v); + } + } + +
JobTracker
.]]>
+ ClusterStatus
provides clients with information such as:
+ JobTracker
.
+ Clients can query for the latest ClusterStatus
, via
+ {@link JobClient#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link Group}s, each comprising of
+ counters from a particular Enum
class.]]>
+
Group
handles localization of the class name and the
+ counter names.
FileInputFormat
implementations can override this and return
+ false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param fs the file system that the file is on
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobConf, int)}.
+
+ Implementations of FileInputFormat
can also override the
+ {@link #isSplitable(FileSystem, Path)} method to prevent input files
+ from being split-up in certain situations. Implementations that may
+ deal with non-splittable files must override this method, since
+ the default implementation assumes splitting is always possible.]]>
+ false
otherwise]]>
+ Note: The following is valid only if the {@link OutputCommitter}
+ is {@link FileOutputCommitter}. If OutputCommitter
is not
+ a FileOutputCommitter
, the task's temporary output
+ directory is same as {@link #getOutputPath(JobConf)} i.e.
+ ${mapreduce.output.fileoutputformat.outputdir}$
Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in ${mapreduce.task.output.dir} during execution + of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the + framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +Note: the value of ${mapreduce.task.output.dir} during + execution of a particular task-attempt is actually + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is + set by the map-reduce framework. So, just create any side-files in the + path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce + task to take advantage of this feature.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +The given name is postfixed with the task type, 'm' for maps, 'r' for + reduces and the task partition number. For example, give a name 'test' + running on the first map o the job the generated name will be + 'test-m-00000'.
+ + @param conf the configuration for the job. + @param name the name to make unique. + @return a unique name accross all tasks of the job.]]> +This method uses the {@link #getUniqueName} method to make the file name + unique for the task.
+ + @param conf the configuration for the job. + @param name the name for the file. + @return a unique path accross all tasks of the job.]]> +Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. + + @param job job configuration. + @param numSplits the desired number of splits, a hint. + @return an array of {@link InputSplit}s for the job.]]> +
RecordReader
to respect
+ record boundaries while processing the logical split to present a
+ record-oriented view to the individual task.
+
+ @param split the {@link InputSplit}
+ @param job the job that this split belongs to
+ @return a {@link RecordReader}]]>
+ The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to be respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibilty to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see JobClient
+ @see FileInputFormat]]>
+
String
s.
+ @throws IOException]]>
+ Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
JobClient
provides facilities to submit jobs, track their
+ progress, access component-tasks' reports/logs, get the Map-Reduce cluster
+ status information etc.
+
+ The job submission process involves: +
JobClient
to submit
+ the job and monitor its progress.
+
+ Here is an example on how to use JobClient
:
+ + Job Control + ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + JobClient.runJob(job); +
At times clients would chain map-reduce jobs to accomplish complex tasks + which cannot be done via a single map-reduce job. This is fairly easy since + the output of the job, typically, goes to distributed file-system and that + can be used as the input for the next job.
+ +However, this also means that the onus on ensuring jobs are complete + (success/failure) lies squarely on the clients. In such situations the + various job-control options are: +
false
otherwise.]]>
+ false
otherwise.]]>
+ For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the combiner sort being + stable in any sense. (In any case, with the order of available + map-outputs to the combiner being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys for the + combiner. It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)]]>
+ For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the reduce sort being + stable in any sense. (In any case, with the order of available + map-outputs to the reduce being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys. + It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)
+ @see #setCombinerKeyGroupingComparator(Class)]]>
+ The combiner is an application-specified aggregation operation, which + can help cut down the amount of data transferred between the + {@link Mapper} and the {@link Reducer}, leading to better performance.
+ +The framework may invoke the combiner 0, 1, or multiple times, in both + the mapper and reducer tasks. In general, the combiner is called as the + sort/merge result is written to disk. The combiner must: +
Typically the combiner is same as the Reducer
for the
+ job i.e. {@link #setReducerClass(Class)}.
true
if speculative execution be used for this job,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be
+ used for this job for map tasks,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be used
+ for reduce tasks for this job,
+ false
otherwise.]]>
+ false
.]]>
+ The number of maps is usually driven by the total size of the inputs + i.e. total number of blocks of the input files.
+ +The right level of parallelism for maps seems to be around 10-100 maps + per-node, although it has been set up to 300 or so for very cpu-light map + tasks. Task setup takes awhile, so it is best if the maps take at least a + minute to execute.
+ +The default behavior of file-based {@link InputFormat}s is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of input files. However, the {@link FileSystem} blocksize of the + input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Thus, if you expect 10TB of input data and have a blocksize of 128MB, + you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is + used to set it even higher.
+ + @param n the number of map tasks for this job. + @see InputFormat#getSplits(JobConf, int) + @see FileInputFormat + @see FileSystem#getDefaultBlockSize() + @see FileStatus#getBlockSize()]]> +The right number of reduces seems to be 0.95
or
+ 1.75
multiplied by (
+ available memory for reduce tasks
+ (The value of this should be smaller than
+ numNodes * yarn.nodemanager.resource.memory-mb
+ since the resource of memory is shared by map tasks and other
+ applications) /
+
+ mapreduce.reduce.memory.mb).
+
With 0.95
all of the reduces can launch immediately and
+ start transfering map outputs as the maps finish. With 1.75
+ the faster nodes will finish their first round of reduces and launch a
+ second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but + increases load balancing and lowers the cost of failures.
+ +The scaling factors above are slightly less than whole numbers to + reserve a few reduce slots in the framework for speculative-tasks, failures + etc.
+ + Reducer NONE + +It is legal to set the number of reduce-tasks to zero
.
In this case the output of the map-tasks directly go to distributed + file-system, to the path set by + {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the + framework doesn't sort the map-outputs before writing it out to HDFS.
+ + @param n the number of reduce tasks for this job.]]> +zero
, i.e. any failed map-task results in
+ the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of map tasks that can fail without
+ the job being aborted.]]>
+ zero
, i.e. any failed reduce-task results
+ in the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of reduce tasks that can fail without
+ the job being aborted.]]>
+ The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script needs to be symlinked.
+ +Here is an example on how to submit a script +
+ + @param mDbgScript the script name]]> ++ job.setMapDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script file needs to be symlinked
+ +Here is an example on how to submit a script +
+ + @param rDbgScript the script name]]> ++ job.setReduceDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
This is typically used by application-writers to implement chaining of + Map-Reduce jobs in an asynchronous manner.
+ + @param uri the job end notification uri + @see JobStatus]]> +
+ ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/
.
+ This directory is exposed to the users through
+ mapreduce.job.local.dir
.
+ So, the tasks can use this space
+ as scratch space and share files among them.
+ This value is available as System property also.
+
+ @return The localized job specific shared directory]]>
+ + For backward compatibility, if the job configuration sets the + key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. + Otherwise, this method will return the larger of the values returned by + {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} + after converting them into bytes. + + @return Memory required to run a task of this job, in bytes. + @see #setMaxVirtualMemoryForTask(long) + @deprecated Use {@link #getMemoryForMapTask()} and + {@link #getMemoryForReduceTask()}]]> +
$key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ .VARNAME
to this configuration key, where VARNAME is
+ the name of the environment variable.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ .VARNAME
to this configuration key, where VARNAME is
+ the name of the environment variable.
+
+ Example:
+ JobConf
is the primary interface for a user to describe a
+ map-reduce job to the Hadoop framework for execution. The framework tries to
+ faithfully execute the job as-is described by JobConf
, however:
+ JobConf
typically specifies the {@link Mapper}, combiner
+ (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
+ {@link OutputFormat} implementations to be used etc.
+
+
Optionally JobConf
is used to specify other advanced facets
+ of the job such as Comparator
s to be used, files to be put in
+ the {@link DistributedCache}, whether or not intermediate and/or job outputs
+ are to be compressed (and how), debugability via user-provided scripts
+ ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
+ for doing post-processing on task logs, task's stdout, stderr, syslog.
+ and etc.
Here is an example on how to configure a job via JobConf
:
+ + @see JobClient + @see ClusterStatus + @see Tool + @see DistributedCache]]> ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + FileInputFormat.setInputPaths(job, new Path("in")); + FileOutputFormat.setOutputPath(job, new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setCombinerClass(MyJob.MyReducer.class); + job.setReducerClass(MyJob.MyReducer.class); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); +
+ JobID.getTaskIDsPattern("200707121733", null); ++ which will return : +
"job_200707121733_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @return a regex pattern matching JobIDs]]> +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the input key. + @param value the input value. + @param output collects mapped keys and values. + @param reporter facility to report progress.]]> +The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link JobConf} for the
+ job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ themselves. Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
The framework then calls
+ {@link #map(Object, Object, OutputCollector, Reporter)}
+ for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are
+ subsequently grouped by the framework, and passed to a {@link Reducer} to
+ determine the final output. Users can control the grouping by specifying
+ a Comparator
via
+ {@link JobConf#setOutputKeyComparatorClass(Class)}.
The grouped Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
The intermediate, grouped outputs are always stored in
+ {@link SequenceFile}s. Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the JobConf
.
If the job has
+ zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link FileSystem} without grouping by keys.
Example:
++ ++ public class MyMapper<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Mapper<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String mapTaskId; + private String inputFile; + private int noRecords = 0; + + public void configure(JobConf job) { + mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + inputFile = job.get(JobContext.MAP_INPUT_FILE); + } + + public void map(K key, V val, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + // reporter.progress(); + + // Process some more + // ... + // ... + + // Increment the no. of <key, value> pairs processed + ++noRecords; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 records update application-level status + if ((noRecords%100) == 0) { + reporter.setStatus(mapTaskId + " processed " + noRecords + + " from input-file: " + inputFile); + } + + // Output the result + output.collect(key, val); + } + } +
Applications may write a custom {@link MapRunnable} to exert greater
+ control on map processing e.g. multi-threaded Mapper
s etc.
Mapping of input records to output records is complete when this method + returns.
+ + @param input the {@link RecordReader} to read the input records. + @param output the {@link OutputCollector} to collect the outputrecords. + @param reporter {@link Reporter} to report progress, status-updates etc. + @throws IOException]]> +MapRunnable
can exert greater
+ control on map processing e.g. multi-threaded, asynchronous mappers etc.
+
+ @see Mapper]]>
+ RecordReader
's for MultiFileSplit
's.
+ @see MultiFileSplit]]>
+ OutputCollector
is the generalization of the facility
+ provided by the Map-Reduce framework to collect data output by either the
+ Mapper
or the Reducer
i.e. intermediate outputs
+ or the output of the job.
false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ false
otherwise
+ @throws IOException]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ Note: A Partitioner
is created only when there are multiple
+ reducers.
1.0
.
+ @throws IOException]]>
+ RecordReader
, typically, converts the byte-oriented view of
+ the input, provided by the InputSplit
, and presents a
+ record-oriented view for the {@link Mapper} and {@link Reducer} tasks for
+ processing. It thus assumes the responsibility of processing record
+ boundaries and presenting the tasks with keys and values.
RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
The framework calls this method for each
+ <key, (list of values)>
pair in the grouped inputs.
+ Output values must be of the same type as input values. Input keys must
+ not be altered. The framework will reuse the key and value objects
+ that are passed into the reduce, therefore the application should clone
+ the objects they want to keep a copy of. In many cases, all values are
+ combined into zero or one value.
+
Output pairs are collected with calls to + {@link OutputCollector#collect(Object,Object)}.
+ +Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes a significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the key. + @param values the list of values to reduce. + @param output to collect keys and combined values. + @param reporter facility to report progress.]]> +Reducer
s for the job is set by the user via
+ {@link JobConf#setNumReduceTasks(int)}. Reducer
implementations
+ can access the {@link JobConf} for the job via the
+ {@link JobConfigurable#configure(JobConf)} method and initialize themselves.
+ Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
+
+ Reducer
has 3 primary phases:
Reducer
is input the grouped output of a {@link Mapper}.
+ In the phase the framework, for each Reducer
, fetches the
+ relevant partition of the output of all the Mapper
s, via HTTP.
+
The framework groups Reducer
inputs by key
s
+ (since different Mapper
s may have output the same key) in this
+ stage.
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ + SecondarySort + +If equivalence rules for keys while grouping the intermediates are
+ different from those for grouping keys before reduction, then one may
+ specify a Comparator
via
+ {@link JobConf#setOutputValueGroupingComparator(Class)}.Since
+ {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to
+ control how intermediate keys are grouped, these can be used in conjunction
+ to simulate secondary sort on values.
In this phase the
+ {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
+ method is called for each <key, (list of values)>
pair in
+ the grouped inputs.
The output of the reduce task is typically written to the + {@link FileSystem} via + {@link OutputCollector#collect(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner + @see Reporter + @see MapReduceBase]]> ++ public class MyReducer<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Reducer<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String reduceTaskId; + private int noKeys = 0; + + public void configure(JobConf job) { + reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + } + + public void reduce(K key, Iterator<V> values, + OutputCollector<K, V> output, + Reporter reporter) + throws IOException { + + // Process + int noValues = 0; + while (values.hasNext()) { + V value = values.next(); + + // Increment the no. of values for this key + ++noValues; + + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + if ((noValues%10) == 0) { + reporter.progress(); + } + + // Process some more + // ... + // ... + + // Output the <key, value> + output.collect(key, value); + } + + // Increment the no. of <key, list of values> pairs processed + ++noKeys; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 keys update application-level status + if ((noKeys%100) == 0) { + reporter.setStatus(reduceTaskId + " processed " + noKeys); + } + } + } +
Reporter
+ provided to report progress or just indicate that they are alive. In
+ scenarios where the application takes significant amount of time to
+ process individual key/value pairs, this is crucial since the framework
+ might assume that the task has timed-out and kill that task.
+
+ Applications can also update {@link Counters} via the provided
+ Reporter
.
false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ Clients can get hold of RunningJob
via the {@link JobClient}
+ and then query the running-job for details such as name, configuration,
+ progress etc.
false
otherwise.]]>
+ false
otherwise.]]>
+ This feature can be used when map/reduce tasks crashes deterministically on + certain input. This happens due to bugs in the map/reduce function. The usual + course would be to fix these bugs. But sometimes this is not possible; + perhaps the bug is in third party libraries for which the source code is + not available. Due to this, the task never reaches to completion even with + multiple attempts and complete data for that task is lost.
+ +With this feature, only a small portion of data is lost surrounding + the bad record, which may be acceptable for some user applications. + see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}
+ +The skipping mode gets kicked off after certain no of failures + see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}
+ +In the skipping mode, the map/reduce task maintains the record range which + is getting processed at all times. Before giving the input to the + map/reduce function, it sends this record range to the Task tracker. + If task crashes, the Task tracker knows which one was the last reported + range. On further attempts that range get skipped.
]]> ++ TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
+ TaskAttemptID.getTaskAttemptIDsPattern(null, null, TaskType.MAP, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType} + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs + @deprecated Use {@link TaskID#getTaskIDsPattern(String, Integer, TaskType, + Integer)}]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType}, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
) }]]> +
+ For the added Mapper the configuration given for it,
+ mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain +
+
+ @param job job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed in a chain. This enables having + reusable specialized Mappers that can be combined to perform composite + operations within a single task. +
+ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use maching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain. +
+ ChainMapper usage pattern: +
+
+ ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
+ For the added Reducer the configuration given for it,
+ reducerConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainReducer, this is done by the setReducer or the addMapper for the last
+ element in the chain.
+
+ @param job job's JobConf to add the Reducer class.
+ @param klass the Reducer class to add.
+ @param inputKeyClass reducer input key class.
+ @param inputValueClass reducer input value class.
+ @param outputKeyClass reducer output key class.
+ @param outputValueClass reducer output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param reducerConf a JobConf with the configuration for the Reducer
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
+ For the added Mapper the configuration given for it,
+ mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainMapper, this is done by the addMapper for the last mapper in the chain
+ .
+
+ @param job chain job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ map(...)
methods of the Mappers in the chain.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed after the Reducer or in a chain. + This enables having reusable specialized Mappers that can be combined to + perform composite operations within a single task. +
+ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use maching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ ChainReducer usage pattern: +
+
+ ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
CombineFileSplit
's.
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ false
+ if it is single. If the name output is not defined it returns
+ false
]]>
+ + MultipleOutputs supports counters, by default the are disabled. + The counters group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. + + @param conf job conf to enableadd the named output. + @param enabled indicates if the counters will be enabled or not.]]> ++ MultipleOutputs supports counters, by default the are disabled. + The counters group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. + + + @param conf job conf to enableadd the named output. + @return TRUE if the counters are enabled, FALSE if they are disabled.]]> +super.close()
at the
+ end of their close()
+
+ @throws java.io.IOException thrown if any of the MultipleOutput files
+ could not be closed properly.]]>
+ map()
and reduce()
methods of the
+ Mapper
and Reducer
implementations.
+
+ Each additional output, or named output, may be configured with its own
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+ A named output can be a single file or a multi file. The later is referred as + a multi named output. +
+ A multi named output is an unbound set of files all sharing the same
+ OutputFormat
, key class and value class configuration.
+
+ When named outputs are used within a Mapper
implementation,
+ key/values written to a name output are not part of the reduce phase, only
+ key/values written to the job OutputCollector
are part of the
+ reduce phase.
+
+ MultipleOutputs supports counters, by default the are disabled. The counters + group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. ++ Job configuration usage pattern is: +
+ + JobConf conf = new JobConf(); + + conf.setInputPath(inDir); + FileOutputFormat.setOutputPath(conf, outDir); + + conf.setMapperClass(MOMap.class); + conf.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional multi sequencefile based output 'sequence' for the + // job + MultipleOutputs.addMultiNamedOutput(conf, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + JobClient jc = new JobClient(); + RunningJob job = jc.submitJob(conf); + + ... ++
+ Job configuration usage pattern is: +
+ + public class MOReduce implements + Reducer<WritableComparable, Writable> { + private MultipleOutputs mos; + + public void configure(JobConf conf) { + ... + mos = new MultipleOutputs(conf); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + OutputCollector output, Reporter reporter) + throws IOException { + ... + mos.getCollector("text", reporter).collect(key, new Text("Hello")); + mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); + mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); + ... + } + + public void close() throws IOException { + mos.close(); + ... + } + + } +]]> +
+ Map implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured to use this MapRunnable class (using
+ the JobConf.setMapRunnerClass method) and
+ the number of threads the thread-pool can use with the
+ mapred.map.multithreadedrunner.threads
property, its default
+ value is 10 threads.
+
]]> +
ClusterMetrics
provides clients with information such as:
+ Clients can query for the latest ClusterMetrics
, via
+ {@link Cluster#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
is named by
+ an {@link Enum} and has a long for the value.
+
+ Counters
are bunched into Groups, each comprising of
+ counters from a particular Enum
class.]]>
+
Counters
holds per job/task counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link CounterGroup}s, each
+ comprising of counters from a particular Enum
class.]]>
+
Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. The InputFormat + also creates the {@link RecordReader} to read the {@link InputSplit}. + + @param context job configuration. + @return an array of {@link InputSplit}s for the job.]]> +
The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibility to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see FileInputFormat]]>
+
Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ A Cluster will be created from the conf parameter only when it's needed.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param status job status
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance()}]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance(Configuration)}]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.]]>
+ false
.]]>
+ false
.]]>
+ + Normally the user creates the application, describes various facets of the + job via {@link Job} and then submits the job and monitor its progress.
+ +Here is an example on how to submit a job:
+]]> ++ // Create a new Job + Job job = Job.getInstance(); + job.setJarByClass(MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + job.waitForCompletion(true); +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link Configuration} for
+ the job via the {@link JobContext#getConfiguration()}.
+
+
The framework first calls
+ {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
+ {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
+ for each key/value pair in the InputSplit
. Finally
+ {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.
All intermediate values associated with a given output key are + subsequently grouped by the framework, and passed to a {@link Reducer} to + determine the final output. Users can control the sorting and grouping by + specifying two key {@link RawComparator} classes.
+ +The Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the Configuration
.
If the job has zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link OutputFormat} without sorting by keys.
Example:
++ ++ public class TokenCounterMapper + extends Mapper<Object, Text, Text, IntWritable>{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } +
Applications may override the
+ {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
+ greater control on map processing e.g. multi-threaded Mapper
s
+ etc.
false
otherwise
+ @see #recoverTask(TaskAttemptContext)
+ @deprecated Use {@link #isRecoverySupported(JobContext)} instead.]]>
+ false
otherwise
+ @throws IOException]]>
+ false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ Note: A Partitioner
is created only when there are multiple
+ reducers.
Note: If you require your Partitioner class to obtain the Job's + configuration object, implement the {@link Configurable} interface.
+ + @see Reducer]]> +RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
Reducer
implementations
+ can access the {@link Configuration} for the job via the
+ {@link JobContext#getConfiguration()} method.
+
+ Reducer
has 3 primary phases:
The Reducer
copies the sorted output from each
+ {@link Mapper} using HTTP across the network.
The framework merge sorts Reducer
inputs by
+ key
s
+ (since different Mapper
s may have output the same key).
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ + SecondarySort + +To achieve a secondary sort on the values returned by the value + iterator, the application should extend the key with the secondary + key and define a grouping comparator. The keys will be sorted using the + entire key, but will be grouped using the grouping comparator to decide + which keys and values are sent in the same call to reduce.The grouping + comparator is specified via + {@link Job#setGroupingComparatorClass(Class)}. The sort order is + controlled by + {@link Job#setSortComparatorClass(Class)}.
+ + + For example, say that you want to find duplicate web pages and tag them + all with the url of the "best" known example. You would set up the job + like: +In this phase the
+ {@link #reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
+ method is called for each <key, (collection of values)>
in
+ the sorted inputs.
The output of the reduce task is typically written to a + {@link RecordWriter} via + {@link Context#write(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner]]> ++ public class IntSumReducer<Key> extends Reducer<Key,IntWritable, + Key,IntWritable> { + private IntWritable result = new IntWritable(); + + public void reduce(Key key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } +
Counter
for the given counterName
]]>
+ counterName
.
+ @param counterName counter name
+ @return the Counter
for the given groupName
and
+ counterName
]]>
+ attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed in a chain. This enables having + reusable specialized Mappers that can be combined to perform composite + operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain. +
+ ChainMapper usage pattern: ++ +
+ ... + Job = new Job(conf); + + Configuration mapAConf = new Configuration(false); + ... + ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + Configuration mapBConf = new Configuration(false); + ... + ChainMapper.addMapper(job, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + ... + + job.waitForComplettion(true); + ... +]]> +
reducerConf
, have precedence over the job's Configuration.
+ This precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ + @param job + the job + @param klass + the Reducer class to add. + @param inputKeyClass + reducer input key class. + @param inputValueClass + reducer input value class. + @param outputKeyClass + reducer output key class. + @param outputValueClass + reducer output value class. + @param reducerConf + a configuration for the Reducer class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the + chain. +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed after the Reducer or in a chain. This + enables having reusable specialized Mappers that can be combined to perform + composite operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to
+ compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ ChainReducer usage pattern: ++ +
+ ... + Job = new Job(conf); + .... + + Configuration reduceConf = new Configuration(false); + ... + ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(job, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + ... + + job.waitForCompletion(true); + ... +]]> +
+ Implementations are responsible for writing the fields of the object + to PreparedStatement, and reading the fields of the object from the + ResultSet. + +
Example:
+ If we have the following table in the database : ++ CREATE TABLE MyTable ( + counter INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + ); ++ then we can read/write the tuples from/to the table with : +
+ public class MyWritable implements Writable, DBWritable { + // Some data + private int counter; + private long timestamp; + + //Writable#write() implementation + public void write(DataOutput out) throws IOException { + out.writeInt(counter); + out.writeLong(timestamp); + } + + //Writable#readFields() implementation + public void readFields(DataInput in) throws IOException { + counter = in.readInt(); + timestamp = in.readLong(); + } + + public void write(PreparedStatement statement) throws SQLException { + statement.setInt(1, counter); + statement.setLong(2, timestamp); + } + + public void readFields(ResultSet resultSet) throws SQLException { + counter = resultSet.getInt(1); + timestamp = resultSet.getLong(2); + } + } +]]> +
CombineFileSplit
's.
+
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ FileInputFormat
implementations can override this and return
+ false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param context the job context
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobContext)}.
+
+ Implementations of FileInputFormat
can also override the
+ {@link #isSplitable(JobContext, Path)} method to prevent input files
+ from being split-up in certain situations. Implementations that may
+ deal with non-splittable files must override this method, since
+ the default implementation assumes splitting is always possible.]]>
+ ) }]]> +
+ Mapper implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured with the mapper to use via + {@link #setMapperClass(Job, Class)} and + the number of thread the thread-pool can use with the + {@link #getNumberOfThreads(JobContext)} method. The default + value is 10 threads. +
]]> +
Mapper.Context
for custom implementations]]>
+ false
otherwise]]>
+ Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in a work directory during execution + of his task i.e. via + {@link #getWorkOutputPath(TaskInputOutputContext)}, and + the framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +This method uses the {@link #getUniqueFile} method to make the file name + unique for the task.
+ + @param context the context for the task. + @param name the name for the file. + @param extension the extension for the file + @return a unique path accross all tasks of the job.]]> +close()
]]>
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+
+ + Case two: to write data to different files provided by user +
+ ++ MultipleOutputs supports counters, by default they are disabled. The + counters group is the {@link MultipleOutputs} class name. The names of the + counters are the same as the output name. These count the number records + written to each output name. +
+ + Usage pattern for job submission: ++ + Job job = new Job(); + + FileInputFormat.setInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + + job.setMapperClass(MOMap.class); + job.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional sequence-file based output 'sequence' for the job + MultipleOutputs.addNamedOutput(job, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + job.waitForCompletion(true); + ... ++
+ Usage in Reducer: +
+ <K, V> String generateFileName(K k, V v) { + return k.toString() + "_" + v.toString(); + } + + public class MOReduce extends + Reducer<WritableComparable, Writable,WritableComparable, Writable> { + private MultipleOutputs mos; + public void setup(Context context) { + ... + mos = new MultipleOutputs(context); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + Context context) + throws IOException { + ... + mos.write("text", , key, new Text("Hello")); + mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); + mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); + mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); + ... + } + + public void cleanup(Context) throws IOException { + mos.close(); + ... + } + + } ++ +
+ When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, + MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat + from the old Hadoop API - ie, output can be written from the Reducer to more than one location. +
+ +
+ Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
to write key and
+ value to a path specified by baseOutputPath
, with no need to specify a named output.
+ Warning: when the baseOutputPath passed to MultipleOutputs.write
+ is a path that resolves outside of the final job output directory, the
+ directory is created immediately and then persists through subsequent
+ task retries, breaking the concept of output committing:
+
+ private MultipleOutputs<Text, Text> out; + + public void setup(Context context) { + out = new MultipleOutputs<Text, Text>(context); + ... + } + + public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + for (Text t : values) { + out.write(key, t, generateFileName(<parameter list...>)); + } + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + out.close(); + } ++ +
+ Use your own code in generateFileName()
to create a custom path to your results.
+ '/' characters in baseOutputPath
will be translated into directory levels in your file system.
+ Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc.
+ No call to context.write()
is necessary. See example generateFileName()
code below.
+
+ private String generateFileName(Text k) { + // expect Text k in format "Surname|Forename" + String[] kStr = k.toString().split("\\|"); + + String sName = kStr[0]; + String fName = kStr[1]; + + // example for k = Smith|John + // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc) + return sName + "/" + fName; + } ++ +
+ Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
+ To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+ instead of job.setOutputFormatClass(TextOutputFormat.class);
in your Hadoop job configuration.
+
The subarray to be used for the partitioning can be defined by means + of the following properties: +
+ +---+---+---+---+---+
+ | B | B | B | B | B |
+ +---+---+---+---+---+
+ 0 1 2 3 4
+ -5 -4 -3 -2 -1
+
+ The first row of numbers gives the position of the offsets 0...5 in
+ the array; the second row gives the corresponding negative offsets.
+ Contrary to Python, the specified subarray has byte i
+ and j
as first and last element, repectively, when
+ i
and j
are the left and right offset.
+
+ For Hadoop programs written in Java, it is advisable to use one of + the following static convenience methods for setting the offsets: +
Reducer.Context
for custom implementations]]>
+ The ResourceManager
responds with a new, monotonically
+ increasing, {@link ApplicationId} which is used by the client to submit
+ a new application.
The ResourceManager
also responds with details such
+ as maximum resource capabilities in the cluster as specified in
+ {@link GetNewApplicationResponse}.
ApplicationId
+ @return response containing the new ApplicationId
to be used
+ to submit an application
+ @throws YarnException
+ @throws IOException
+ @see #submitApplication(SubmitApplicationRequest)]]>
+ ResourceManager.
+
+ The client is required to provide details such as queue,
+ {@link Resource} required to run the ApplicationMaster
,
+ the equivalent of {@link ContainerLaunchContext} for launching
+ the ApplicationMaster
etc. via the
+ {@link SubmitApplicationRequest}.
Currently the ResourceManager
sends an immediate (empty)
+ {@link SubmitApplicationResponse} on accepting the submission and throws
+ an exception if it rejects the submission. However, this call needs to be
+ followed by {@link #getApplicationReport(GetApplicationReportRequest)}
+ to make sure that the application gets properly submitted - obtaining a
+ {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
+ that RM 'remembers' this application beyond failover or restart. If RM
+ failover or RM restart happens before ResourceManager saves the
+ application's state successfully, the subsequent
+ {@link #getApplicationReport(GetApplicationReportRequest)} will throw
+ a {@link ApplicationNotFoundException}. The Clients need to re-submit
+ the application with the same {@link ApplicationSubmissionContext} when
+ it encounters the {@link ApplicationNotFoundException} on the
+ {@link #getApplicationReport(GetApplicationReportRequest)} call.
During the submission process, it checks whether the application + already exists. If the application exists, it will simply return + SubmitApplicationResponse
+ + In secure mode,the ResourceManager
verifies access to
+ queues etc. before accepting the application submission.
ResourceManager
to fail an application attempt.
+
+ The client, via {@link FailApplicationAttemptRequest} provides the + {@link ApplicationAttemptId} of the attempt to be failed.
+ + In secure mode,the ResourceManager
verifies access to the
+ application, queue etc. before failing the attempt.
Currently, the ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request.
ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request
+ @throws YarnException
+ @throws IOException
+ @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)]]>
+ ResourceManager
to abort submitted application.
+
+ The client, via {@link KillApplicationRequest} provides the + {@link ApplicationId} of the application to be aborted.
+ + In secure mode,the ResourceManager
verifies access to the
+ application, queue etc. before terminating the application.
Currently, the ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request.
ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request
+ @throws YarnException
+ @throws IOException
+ @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)]]>
+ ResourceManager
.
+
+ The ResourceManager
responds with a
+ {@link GetClusterMetricsResponse} which includes the
+ {@link YarnClusterMetrics} with details such as number of current
+ nodes in the cluster.
ResourceManager
.
+
+ The ResourceManager
responds with a
+ {@link GetClusterNodesResponse} which includes the
+ {@link NodeReport} for all the nodes in the cluster.
ResourceManager
.
+
+ The client, via {@link GetQueueInfoRequest}, can ask for details such + as used/total resources, child queues, running applications etc.
+ + In secure mode,the ResourceManager
verifies access before
+ providing the information.
ResourceManager
.
+
+
+ The ResourceManager
responds with queue acls for all
+ existing queues.
The ResourceManager
responds with a new, unique,
+ {@link ReservationId} which is used by the client to submit
+ a new reservation.
ReservationId
+ @return response containing the new ReservationId
to be used
+ to submit a new reservation
+ @throws YarnException if the reservation system is not enabled.
+ @throws IOException on IO failures.
+ @see #submitReservation(ReservationSubmissionRequest)]]>
+ + The client packages all details of its request in a + {@link ReservationSubmissionRequest} object. This contains information + about the amount of capacity, temporal constraints, and concurrency needs. + Furthermore, the reservation might be composed of multiple stages, with + ordering dependencies among them. +
+ ++ In order to respond, a new admission control component in the + {@code ResourceManager} performs an analysis of the resources that have + been committed over the period of time the user is requesting, verify that + the user requests can be fulfilled, and that it respect a sharing policy + (e.g., {@code CapacityOverTimePolicy}). Once it has positively determined + that the ReservationSubmissionRequest is satisfiable the + {@code ResourceManager} answers with a + {@link ReservationSubmissionResponse} that include a non-null + {@link ReservationId}. Upon failure to find a valid allocation the response + is an exception with the reason. + + On application submission the client can use this {@link ReservationId} to + obtain access to the reserved resources. +
+ ++ The system guarantees that during the time-range specified by the user, the + reservationID will be corresponding to a valid reservation. The amount of + capacity dedicated to such queue can vary overtime, depending of the + allocation that has been determined. But it is guaranteed to satisfy all + the constraint expressed by the user in the + {@link ReservationSubmissionRequest}. +
+ + @param request the request to submit a new Reservation + @return response the {@link ReservationId} on accepting the submission + @throws YarnException if the request is invalid or reservation cannot be + created successfully + @throws IOException]]> ++ The allocation is attempted by virtually substituting all previous + allocations related to this Reservation with new ones, that satisfy the new + {@link ReservationUpdateRequest}. Upon success the previous allocation is + substituted by the new one, and on failure (i.e., if the system cannot find + a valid allocation for the updated request), the previous allocation + remains valid. + + The {@link ReservationId} is not changed, and applications currently + running within this reservation will automatically receive the resources + based on the new allocation. +
+ + @param request to update an existing Reservation (the ReservationRequest + should refer to an existing valid {@link ReservationId}) + @return response empty on successfully updating the existing reservation + @throws YarnException if the request is invalid or reservation cannot be + updated successfully + @throws IOException]]> +ResourceManager
to signal a container. For example,
+ the client can send command OUTPUT_THREAD_DUMP to dump threads of the
+ container.
+
+ The client, via {@link SignalContainerRequest} provides the + id of the container and the signal command.
+ + In secure mode,the ResourceManager
verifies access to the
+ application before signaling the container.
+ The user needs to have MODIFY_APP
permission.
Currently, the ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request.
ResourceManager
returns an empty response
+ on success and throws an exception on rejecting the request
+ @throws YarnException
+ @throws IOException]]>
+ ResourceManager
+ to submit/abort jobs and to get information on applications, cluster metrics,
+ nodes, queues and ACLs.]]>
+ ApplicationHistoryServer
to
+ get the information of completed applications etc.
+ ]]>
+ ApplicationMaster
to register with
+ the ResourceManager
.
+
+
+
+ The ApplicationMaster
needs to provide details such as RPC
+ Port, HTTP tracking url etc. as specified in
+ {@link RegisterApplicationMasterRequest}.
+
+ The ResourceManager
responds with critical details such as
+ maximum resource capabilities in the cluster as specified in
+ {@link RegisterApplicationMasterResponse}.
+
+ Re-register is only allowed for Unmanaged Application Master
+ (UAM) HA, with
+ {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()}
+ set to true.
+
ApplicationMaster
to notify the
+ ResourceManager
about its completion (success or failed).
+
+ The ApplicationMaster
has to provide details such as
+ final state, diagnostics (in case of failures) etc. as specified in
+ {@link FinishApplicationMasterRequest}.
The ResourceManager
responds with
+ {@link FinishApplicationMasterResponse}.
ApplicationMaster
and the
+ ResourceManager
.
+
+
+
+ The ApplicationMaster
uses this interface to provide a list of
+ {@link ResourceRequest} and returns unused {@link Container} allocated to
+ it via {@link AllocateRequest}. Optionally, the
+ ApplicationMaster
can also blacklist resources which
+ it doesn't want to use.
+
+ This also doubles up as a heartbeat to let the
+ ResourceManager
know that the ApplicationMaster
+ is alive. Thus, applications should periodically make this call to be kept
+ alive. The frequency depends on
+ {@link YarnConfiguration#RM_AM_EXPIRY_INTERVAL_MS} which defaults to
+ {@link YarnConfiguration#DEFAULT_RM_AM_EXPIRY_INTERVAL_MS}.
+
+ The ResourceManager
responds with list of allocated
+ {@link Container}, status of completed containers and headroom information
+ for the application.
+
+ The ApplicationMaster
can use the available headroom
+ (resources) to decide how to utilized allocated resources and make informed
+ decisions about future resource requests.
+
ApplicationMaster
+ and the ResourceManager
.
+
+ This is used by the ApplicationMaster
to register/unregister
+ and to request and obtain resources in the cluster from the
+ ResourceManager
.
SharedCacheManager.
The client uses a checksum to identify the
+ resource and an {@link ApplicationId} to identify which application will be
+ using the resource.
+
+
+
+ The SharedCacheManager
responds with whether or not the
+ resource exists in the cache. If the resource exists, a Path
+ to the resource in the shared cache is returned. If the resource does not
+ exist, the response is empty.
+
SharedCacheManager.
This method is called once an application
+ is no longer using a claimed resource in the shared cache. The client uses
+ a checksum to identify the resource and an {@link ApplicationId} to
+ identify which application is releasing the resource.
+
+
+ + Note: This method is an optimization and the client is not required to call + it for correctness. +
+ +
+ Currently the SharedCacheManager
sends an empty response.
+
SharedCacheManager
to claim
+ and release resources in the shared cache.
+ ]]>
+ ApplicationMaster
provides a list of
+ {@link StartContainerRequest}s to a NodeManager
to
+ start {@link Container}s allocated to it using this interface.
+
+
+
+ The ApplicationMaster
has to provide details such as allocated
+ resource capability, security tokens (if enabled), command to be executed
+ to start the container, environment for the process, necessary
+ binaries/jar/shared-objects etc. via the {@link ContainerLaunchContext} in
+ the {@link StartContainerRequest}.
+
+ The NodeManager
sends a response via
+ {@link StartContainersResponse} which includes a list of
+ {@link Container}s of successfully launched {@link Container}s, a
+ containerId-to-exception map for each failed {@link StartContainerRequest} in
+ which the exception indicates errors from per container and a
+ allServicesMetaData map between the names of auxiliary services and their
+ corresponding meta-data. Note: None-container-specific exceptions will
+ still be thrown by the API method itself.
+
+ The ApplicationMaster
can use
+ {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated
+ statuses of the to-be-launched or launched containers.
+
ApplicationMaster
requests a NodeManager
to
+ stop a list of {@link Container}s allocated to it using this
+ interface.
+
+
+
+ The ApplicationMaster
sends a {@link StopContainersRequest}
+ which includes the {@link ContainerId}s of the containers to be stopped.
+
+ The NodeManager
sends a response via
+ {@link StopContainersResponse} which includes a list of {@link ContainerId}
+ s of successfully stopped containers, a containerId-to-exception map for
+ each failed request in which the exception indicates errors from per
+ container. Note: None-container-specific exceptions will still be thrown by
+ the API method itself. ApplicationMaster
can use
+ {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated
+ statuses of the containers.
+
ApplicationMaster
to request for current
+ statuses of Container
s from the NodeManager
.
+
+
+
+ The ApplicationMaster
sends a
+ {@link GetContainerStatusesRequest} which includes the {@link ContainerId}s
+ of all containers whose statuses are needed.
+
+ The NodeManager
responds with
+ {@link GetContainerStatusesResponse} which includes a list of
+ {@link ContainerStatus} of the successfully queried containers and a
+ containerId-to-exception map for each failed request in which the exception
+ indicates errors from per container. Note: None-container-specific
+ exceptions will still be thrown by the API method itself.
+
ContainerStatus
es of containers with
+ the specified ContainerId
s
+ @return response containing the list of ContainerStatus
of the
+ successfully queried containers and a containerId-to-exception map
+ for failed requests.
+
+ @throws YarnException
+ @throws IOException]]>
+ ApplicationMaster
to request for
+ resource increase of running containers on the NodeManager
.
+
+
+ @param request
+ request to increase resource of a list of containers
+ @return response which includes a list of containerIds of containers
+ whose resource has been successfully increased and a
+ containerId-to-exception map for failed requests.
+
+ @throws YarnException
+ @throws IOException]]>
+ ApplicationMaster
to request for
+ resource update of running containers on the NodeManager
.
+
+
+ @param request
+ request to update resource of a list of containers
+ @return response which includes a list of containerIds of containers
+ whose resource has been successfully updated and a
+ containerId-to-exception map for failed requests.
+
+ @throws YarnException Exception specific to YARN
+ @throws IOException IOException thrown from NodeManager]]>
+ ApplicationMaster
and a
+ NodeManager
to start/stop and increase resource of containers
+ and to get status of running containers.
+
+ If security is enabled the NodeManager
verifies that the
+ ApplicationMaster
has truly been allocated the container
+ by the ResourceManager
and also verifies all interactions such
+ as stopping the container or obtaining status information for the container.
+
ResourceManager
about the application's resource requirements.
+ @return the list of ResourceRequest
+ @see ResourceRequest]]>
+ ResourceManager
about the application's resource requirements.
+ @param resourceRequests list of ResourceRequest
to update the
+ ResourceManager
about the application's
+ resource requirements
+ @see ResourceRequest]]>
+ ApplicationMaster
.
+ @return list of ContainerId
of containers being
+ released by the ApplicationMaster
]]>
+ ApplicationMaster
+ @param releaseContainers list of ContainerId
of
+ containers being released by the
+ ApplicationMaster
]]>
+ ApplicationMaster
.
+ @return the ResourceBlacklistRequest
being sent by the
+ ApplicationMaster
+ @see ResourceBlacklistRequest]]>
+ ResourceManager
about the blacklist additions and removals
+ per the ApplicationMaster
.
+
+ @param resourceBlacklistRequest the ResourceBlacklistRequest
+ to inform the ResourceManager
about
+ the blacklist additions and removals
+ per the ApplicationMaster
+ @see ResourceBlacklistRequest]]>
+ ApplicationMaster
.]]>
+ UpdateContainerRequest
for
+ containers to be updated]]>
+ ApplicationMaster
.]]>
+ ResourceManager
about the application's resource
+ requirements.]]>
+ ApplicationMaster
to the
+ ResourceManager
to obtain resources in the cluster.
+
+ The request includes: +
ResourceManager
about the application's
+ resource requirements.
+ ResourceManager
about the change in
+ requirements of running containers.
+ responseId
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ progress
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ askList
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ releaseList
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ resourceBlacklistRequest
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ updateRequests
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ SchedulingRequest
of the request
+ @return {@link AllocateRequestBuilder}]]>
+ ApplicationMaster
to take some action then it will send an
+ AMCommand to the ApplicationMaster
. See AMCommand
+ for details on commands and actions for them.
+ @return AMCommand
if the ApplicationMaster
should
+ take action, null
otherwise
+ @see AMCommand]]>
+ Container
by the
+ ResourceManager
.
+ @return list of newly allocated Container
]]>
+ NodeReport
s. Updates could
+ be changes in health, availability etc of the nodes.
+ @return The delta of updated nodes since the last response]]>
+ + AM will receive one NMToken per NM irrespective of the number of containers + issued on same NM. AM is expected to store these tokens until issued a + new token for the same NM. + @return list of NMTokens required for communicating with NM]]> +
ResourceManager
from previous application attempts.]]>
+ ApplicationMaster
during resource negotiation.
+ + The response, includes: +
Application Master
to the
+ Node Manager
to change the resource quota of a container.
+
+ @see ContainerManagementProtocol#updateContainer(ContainerUpdateRequest)]]>
+ NodeManager
to the
+ ApplicationMaster
when asked to update container resource.
+
+
+ @see ContainerManagementProtocol#updateContainer(ContainerUpdateRequest)]]>
+ ApplicationAttemptId
of the attempt.]]>
+ ResourceManager
+ to fail an application attempt.
+
+ The request includes the {@link ApplicationAttemptId} of the attempt to + be failed.
+ + @see ApplicationClientProtocol#failApplicationAttempt(FailApplicationAttemptRequest)]]> +ResourceManager
to the client
+ failing an application attempt.
+
+ Currently it's empty.
+ + @see ApplicationClientProtocol#failApplicationAttempt(FailApplicationAttemptRequest)]]> +ApplicationMaster
.
+ @return final state of the ApplicationMaster
]]>
+ ApplicationMaster
+ @param finalState final state of the ApplicationMaster
]]>
+ ApplicationMaster
.
+ This url if contains scheme then that will be used by resource manager
+ web application proxy otherwise it will default to http.
+ @return tracking URLfor the ApplicationMaster
]]>
+ ApplicationMaster
.
+ This is the web-URL to which ResourceManager or web-application proxy will
+ redirect client/users once the application is finished and the
+ ApplicationMaster
is gone.
+ + If the passed url has a scheme then that will be used by the + ResourceManager and web-application proxy, otherwise the scheme will + default to http. +
++ Empty, null, "N/A" strings are all valid besides a real URL. In case an url + isn't explicitly passed, it defaults to "N/A" on the ResourceManager. +
+
+ @param url
+ tracking URLfor the ApplicationMaster
]]>
+
ApplicationMaster
on it's completion.
+ + The response, includes: +
+ Note: The flag indicates whether the application has successfully + unregistered and is safe to stop. The application may stop after the flag is + true. If the application stops before the flag is true then the RM may retry + the application. + + @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)]]> +
ApplicationAttemptId
of an application attempt]]>
+ ApplicationAttemptId
of an application attempt]]>
+ ResourceManager
to get an
+ {@link ApplicationAttemptReport} for an application attempt.
+
+
+ + The request should include the {@link ApplicationAttemptId} of the + application attempt. +
+ + @see ApplicationAttemptReport + @see ApplicationHistoryProtocol#getApplicationAttemptReport(GetApplicationAttemptReportRequest)]]> +ApplicationAttemptReport
for the application attempt]]>
+ ApplicationAttemptReport
for the application attempt]]>
+ ResourceManager
to a client requesting
+ an application attempt report.
+
+
+ + The response includes an {@link ApplicationAttemptReport} which has the + details about the particular application attempt +
+ + @see ApplicationAttemptReport + @see ApplicationHistoryProtocol#getApplicationAttemptReport(GetApplicationAttemptReportRequest)]]> +ApplicationId
of an application]]>
+ ApplicationId
of an application]]>
+ ResourceManager
.
+
+
+ @see ApplicationHistoryProtocol#getApplicationAttempts(GetApplicationAttemptsRequest)]]>
+ ApplicationReport
of an application]]>
+ ApplicationReport
of an application]]>
+ ResourceManager
to a client requesting
+ a list of {@link ApplicationAttemptReport} for application attempts.
+
+
+
+ The ApplicationAttemptReport
for each application includes the
+ details of an application attempt.
+
ApplicationId
of the application]]>
+ ApplicationId
of the application]]>
+ ResourceManager
to
+ get an {@link ApplicationReport} for an application.
+
+ The request should include the {@link ApplicationId} of the + application.
+ + @see ApplicationClientProtocol#getApplicationReport(GetApplicationReportRequest) + @see ApplicationReport]]> +ApplicationReport
for the application]]>
+ ResourceManager
to a client
+ requesting an application report.
+
+ The response includes an {@link ApplicationReport} which has details such
+ as user, queue, name, host on which the ApplicationMaster
is
+ running, RPC port, tracking URL, diagnostics, start time etc.
ResourceManager
.
+
+
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
+
+ Setting any of the parameters to null, would just disable that + filter
+ + @param scope {@link ApplicationsRequestScope} to filter by + @param users list of users to filter by + @param queues list of scheduler queues to filter by + @param applicationTypes types of applications + @param applicationTags application tags to filter by + @param applicationStates application states to filter by + @param startRange range of application start times to filter by + @param finishRange range of application finish times to filter by + @param limit number of applications to limit to + @return {@link GetApplicationsRequest} to be used with + {@link ApplicationClientProtocol#getApplications(GetApplicationsRequest)}]]> +ResourceManager
.
+
+
+ @param scope {@link ApplicationsRequestScope} to filter by
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
+ @return a report of Applications in {@link GetApplicationsRequest}]]>
+ ResourceManager
.
+
+
+
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
+ @return a report of Applications in {@link GetApplicationsRequest}]]>
+ ResourceManager
.
+
+
+
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
+ @return a report of Applications in {@link GetApplicationsRequest}]]>
+ ResourceManager
.
+
+
+
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)
+ @return a report of Applications in GetApplicationsRequest
]]>
+ ResourceManager
.
+
+ @see ApplicationClientProtocol#getApplications(GetApplicationsRequest)]]>
+ ApplicationReport
for applications]]>
+ ResourceManager
to a client
+ requesting an {@link ApplicationReport} for applications.
+
+ The ApplicationReport
for each application includes details
+ such as user, queue, name, host on which the ApplicationMaster
+ is running, RPC port, tracking URL, diagnostics, start time etc.
ResourceManager
.
+
+
+ @see ApplicationClientProtocol#getAttributesToNodes
+ (GetAttributesToNodesRequest)]]>
+ ResourceManager
to a client requesting
+ node to attribute value mapping for all or given set of Node AttributeKey's.
+
+
+ @see ApplicationClientProtocol#getAttributesToNodes
+ (GetAttributesToNodesRequest)]]>
+ ResourceManager
.
+
+ Currently, this is empty.
+ + @see ApplicationClientProtocol#getClusterMetrics(GetClusterMetricsRequest)]]> +YarnClusterMetrics
for the cluster]]>
+ ResourceManager
.
+
+
+ @see ApplicationClientProtocol#getClusterNodeAttributes
+ (GetClusterNodeAttributesRequest)]]>
+ ResourceManager
to a client requesting
+ a node attributes in cluster.
+
+
+ @see ApplicationClientProtocol#getClusterNodeAttributes
+ (GetClusterNodeAttributesRequest)]]>
+ ResourceManager
.
+
+ The request will ask for all nodes in the given {@link NodeState}s.
+
+ @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest)]]>
+ NodeReport
for all nodes in the cluster]]>
+ ResourceManager
to a client
+ requesting a {@link NodeReport} for all nodes.
+
+ The NodeReport
contains per-node information such as
+ available resources, number of containers, tracking url, rack name, health
+ status etc.
+
+ @see NodeReport
+ @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest)]]>
+
ContainerId
of the Container]]>
+ ContainerId
of the container]]>
+ ResourceManager
to get an
+ {@link ContainerReport} for a container.
+ ]]>
+ ContainerReport
for the container]]>
+ ResourceManager
to a client requesting
+ a container report.
+
+
+ + The response includes a {@link ContainerReport} which has details of a + container. +
]]> +ApplicationAttemptId
of an application attempt]]>
+ ApplicationAttemptId
of an application attempt]]>
+ ResourceManager
.
+
+
+ @see ApplicationHistoryProtocol#getContainers(GetContainersRequest)]]>
+ ContainerReport
for all the containers of an
+ application attempt]]>
+ ContainerReport
for all the containers of
+ an application attempt]]>
+ ResourceManager
to a client requesting
+ a list of {@link ContainerReport} for containers.
+
+
+
+ The ContainerReport
for each container includes the container
+ details.
+
ContainerStatus
.
+
+ @return the list of ContainerId
s of containers for which to
+ obtain the ContainerStatus
.]]>
+ ContainerStatus
+
+ @param containerIds
+ a list of ContainerId
s of containers for which to
+ obtain the ContainerStatus
]]>
+ NodeManager
to get {@link ContainerStatus} of requested
+ containers.
+
+ @see ContainerManagementProtocol#getContainerStatuses(GetContainerStatusesRequest)]]>
+ ContainerStatus
es of the requested containers.]]>
+ ApplicationMaster
when asked to obtain the
+ ContainerStatus
of requested containers.
+
+ @see ContainerManagementProtocol#getContainerStatuses(GetContainerStatusesRequest)]]>
+ Currently, this is empty.
+ + @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)]]> +ApplicationId
allocated by the
+ ResourceManager
.
+ @return new ApplicationId
allocated by the
+ ResourceManager
]]>
+ ResourceManager
to the client for
+ a request to get a new {@link ApplicationId} for submitting applications.
+
+ Clients can submit an application with the returned + {@link ApplicationId}.
+ + @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)]]> +ResourceManager
to the client for
+ a request to get a new {@link ReservationId} for submitting reservations.
+
+ Clients can submit an reservation with the returned + {@link ReservationId}.
+ + {@code ApplicationClientProtocol#getNewReservation(GetNewReservationRequest)}]]> +ResourceManager
.
+
+
+ @see ApplicationClientProtocol#getNodesToAttributes
+ (GetNodesToAttributesRequest)]]>
+ ResourceManager
to a client requesting
+ nodes to attributes mapping.
+
+
+ @see ApplicationClientProtocol#getNodesToAttributes
+ (GetNodesToAttributesRequest)]]>
+ true
if applications' information is to be included,
+ else false
]]>
+ true
if information about child queues is required,
+ else false
]]>
+ true
if information about entire hierarchy is
+ required, false
otherwise]]>
+ ResourceManager
.
+
+ @see ApplicationClientProtocol#getQueueInfo(GetQueueInfoRequest)]]>
+ QueueInfo
for the specified queue]]>
+ ResourceManager
to
+ get queue acls for the current user.
+
+ Currently, this is empty.
+ + @see ApplicationClientProtocol#getQueueUserAcls(GetQueueUserAclsInfoRequest)]]> +QueueUserACLInfo
per queue for the user]]>
+ ResourceManager
to clients
+ seeking queue acls for the user.
+
+ The response contains a list of {@link QueueUserACLInfo} which + provides information about {@link QueueACL} per queue.
+ + @see QueueACL + @see QueueUserACLInfo + @see ApplicationClientProtocol#getQueueUserAcls(GetQueueUserAclsInfoRequest)]]> +Application Master
to the
+ Node Manager
to change the resource quota of a container.
+
+ @see ContainerManagementProtocol#increaseContainersResource(IncreaseContainersResourceRequest)]]>
+ NodeManager
to the
+ ApplicationMaster
when asked to increase container resource.
+
+
+ @see ContainerManagementProtocol#increaseContainersResource(IncreaseContainersResourceRequest)]]>
+ ApplicationId
of the application to be aborted]]>
+ ResourceManager
+ to abort a submitted application.
+
+ The request includes the {@link ApplicationId} of the application to be + aborted.
+ + @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)]]> ++ The response, includes: +
ResourceManager
crashes before the process of killing the
+ application is completed, the ResourceManager
may retry this
+ application on recovery.
+
+ @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)]]>
+ ApplicationId
of the application to be moved]]>
+ ApplicationId
of the application to be moved]]>
+ ResourceManager
+ to move a submitted application to a different queue.
+
+ The request includes the {@link ApplicationId} of the application to be + moved and the queue to place it in.
+ + @see ApplicationClientProtocol#moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest)]]> +ResourceManager
to the client moving
+ a submitted application to a different queue.
+
+ + A response without exception means that the move has completed successfully. +
+ + @see ApplicationClientProtocol#moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest)]]> +RegisterApplicationMasterRequest
]]>
+ ApplicationMaster
is
+ running.
+ @return host on which the ApplicationMaster
is running]]>
+ ApplicationMaster
is
+ running.
+ @param host host on which the ApplicationMaster
+ is running]]>
+ ApplicationMaster
.
+ This url if contains scheme then that will be used by resource manager
+ web application proxy otherwise it will default to http.
+ @return tracking URL for the ApplicationMaster
]]>
+ ApplicationMaster
while
+ it is running. This is the web-URL to which ResourceManager or
+ web-application proxy will redirect client/users while the application and
+ the ApplicationMaster
are still running.
+ + If the passed url has a scheme then that will be used by the + ResourceManager and web-application proxy, otherwise the scheme will + default to http. +
++ Empty, null, "N/A" strings are all valid besides a real URL. In case an url + isn't explicitly passed, it defaults to "N/A" on the ResourceManager. +
+
+ @param trackingUrl
+ tracking URLfor the ApplicationMaster
]]>
+
ApplicationACL
s]]>
+ The ClientToAMToken master key is sent to ApplicationMaster
+ by ResourceManager
via {@link RegisterApplicationMasterResponse}
+ , used to verify corresponding ClientToAMToken.
+ @return the queue that the application was placed in.]]> +
]]> +
ResourceManager
from previous application attempts.
+
+
+ @return the list of running containers as viewed by
+ ResourceManager
from previous application attempts
+ @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()]]>
+ ContainerId
of the container to re-initialize.]]>
+ ContainerLaunchContext
of to re-initialize the
+ container with.]]>
+ ApplicationId
]]>
+ ApplicationId
]]>
+ key
]]>
+ SharedCacheManager
when
+ releasing a resource in the shared cache.
+
+
+ + Currently, this is empty. +
]]> +ResourceManager
to a client on
+ reservation submission.
+
+ Currently, this is empty.
+ + {@code ApplicationClientProtocol#submitReservation( + ReservationSubmissionRequest)}]]> +ContainerId
of the container to localize resources.]]>
+ LocalResource
required by the container]]>
+ ContainerId
of the container to signal.]]>
+ SignalContainerCommand
of the signal request.]]>
+ ResourceManager
+ or by the ApplicationMaster
to the NodeManager
+ to signal a container.
+ @see SignalContainerCommand ]]>
+ ResourceManager
to the client
+ signalling a container.
+
+ Currently it's empty.
+ + @see ApplicationClientProtocol#signalToContainer(SignalContainerRequest)]]> +NodeManager
.
+
+ @return ContainerLaunchContext
for the container to be started
+ by the NodeManager
]]>
+ NodeManager
+ @param context ContainerLaunchContext
for the container to be
+ started by the NodeManager
]]>
+ ApplicationMaster
to the
+ NodeManager
to start a container.
+
+ The ApplicationMaster
has to provide details such as
+ allocated resource capability, security tokens (if enabled), command
+ to be executed to start the container, environment for the process,
+ necessary binaries/jar/shared-objects etc. via the
+ {@link ContainerLaunchContext}.
ApplicationMaster
to the NodeManager
to
+ start containers.
+
+
+
+ In each {@link StartContainerRequest}, the ApplicationMaster
has
+ to provide details such as allocated resource capability, security tokens (if
+ enabled), command to be executed to start the container, environment for the
+ process, necessary binaries/jar/shared-objects etc. via the
+ {@link ContainerLaunchContext}.
+
ContainerId
s of the containers that are
+ started successfully.
+ @see ContainerManagementProtocol#startContainers(StartContainersRequest)]]>
+ NodeManager
.
+
+
+ The meta-data is returned as a Map between the auxiliary service names and
+ their corresponding per service meta-data as an opaque blob
+ ByteBuffer
+
+ To be able to interpret the per-service meta-data, you should consult the + documentation for the Auxiliary-service configured on the NodeManager +
+ + @return a Map between the names of auxiliary services and their + corresponding meta-data]]> +NodeManager
to the
+ ApplicationMaster
when asked to start an allocated
+ container.
+
+
+ @see ContainerManagementProtocol#startContainers(StartContainersRequest)]]>
+ ContainerId
s of containers to be stopped]]>
+ ContainerId
s of the containers to be stopped]]>
+ ApplicationMaster
to the
+ NodeManager
to stop containers.
+
+ @see ContainerManagementProtocol#stopContainers(StopContainersRequest)]]>
+ NodeManager
to the
+ ApplicationMaster
when asked to stop allocated
+ containers.
+
+
+ @see ContainerManagementProtocol#stopContainers(StopContainersRequest)]]>
+ ApplicationSubmissionContext
for the application]]>
+ ApplicationSubmissionContext
for the
+ application]]>
+ ResourceManager
.
+
+ The request, via {@link ApplicationSubmissionContext}, contains
+ details such as queue, {@link Resource} required to run the
+ ApplicationMaster
, the equivalent of
+ {@link ContainerLaunchContext} for launching the
+ ApplicationMaster
etc.
+
+ @see ApplicationClientProtocol#submitApplication(SubmitApplicationRequest)]]>
+
ResourceManager
to a client on
+ application submission.
+
+ Currently, this is empty.
+ + @see ApplicationClientProtocol#submitApplication(SubmitApplicationRequest)]]> +ApplicationId
of the application]]>
+ ApplicationId
of the application]]>
+ Priority
of the application to be set.]]>
+ Priority
of the application]]>
+ ResourceManager
to set or
+ update the application priority.
+
+ + The request includes the {@link ApplicationId} of the application and + {@link Priority} to be set for an application +
+ + @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)]]> +Priority
of the application.]]>
+ Priority
of the application]]>
+ ResourceManager
to the client on update
+ the application priority.
+
+ + A response without exception means that the move has completed successfully. +
+ + @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)]]> +ApplicationId
of the application]]>
+ ApplicationId
of the application]]>
+ ApplicationTimeouts
of the application.]]>
+ ApplicationTimeouts
s for the
+ application]]>
+ ResourceManager
to set or
+ update the application timeout.
+
+ + The request includes the {@link ApplicationId} of the application and timeout + to be set for an application +
]]> +ApplicationTimeouts
of the application.]]>
+ ApplicationTimeouts
s for the
+ application]]>
+ ResourceManager
to the client on update
+ application timeout.
+
+ + A response without exception means that the update has completed + successfully. +
]]> +ApplicationId
]]>
+ ApplicationId
]]>
+ key
]]>
+ SharedCacheManager
that claims a
+ resource in the shared cache.
+ ]]>
+ Path
if the resource exists in the shared
+ cache, null
otherwise]]>
+ Path
corresponding to a resource in the shared
+ cache]]>
+ ApplicationAttempId
.
+ @return ApplicationId
of the ApplicationAttempId
]]>
+ Application
.
+ @return attempt id
of the Application
]]>
+ ApplicationAttemptId
denotes the particular attempt
+ of an ApplicationMaster
for a given {@link ApplicationId}.
+
+ Multiple attempts might be needed to run an application to completion due
+ to temporal failures of the ApplicationMaster
such as hardware
+ failures, connectivity issues etc. on the node on which it was scheduled.
ApplicationMaster
.
+
+ @return RPC port of this attempt ApplicationMaster
]]>
+ ApplicationMaster
is running.
+
+ @return host on which this attempt of
+ ApplicationMaster
is running]]>
+ ApplicationAttemptId
of the attempt]]>
+ ContainerId
of the attempt]]>
+ ApplicationMaster
of this attempt is
+ running.ApplicationMaster
of this attempt.ResourceManager
.
+ @return short integer identifier of the ApplicationId
]]>
+ ResourceManager
which is
+ used to generate globally unique ApplicationId
.
+ @return start time of the ResourceManager
]]>
+ ApplicationId
represents the globally unique
+ identifier for an application.
+
+ The globally unique nature of the identifier is achieved by using the
+ cluster timestamp i.e. start-time of the
+ ResourceManager
along with a monotonically increasing counter
+ for the application.
ApplicationId
of the application]]>
+ ApplicationAttemptId
of the attempt]]>
+ ApplicationMaster
+ is running.
+ @return host on which the ApplicationMaster
+ is running]]>
+ ApplicationMaster
.
+ @return RPC port of the ApplicationMaster
]]>
+ ApplicationMaster
.
+
+ ClientToAMToken is the security token used by the AMs to verify
+ authenticity of any client
.
+
+ The ResourceManager
, provides a secure token (via
+ {@link ApplicationReport#getClientToAMToken()}) which is verified by the
+ ApplicationMaster when the client directly talks to an AM.
+
ApplicationMaster
]]>
+ YarnApplicationState
of the application]]>
+ + The AMRM token will be returned only if all the following conditions are + met: +
ApplicationMaster
is running.ApplicationMaster
.Resource
]]>
+ Resource
]]>
+ Resource
]]>
+ ApplicationId
of the submitted application]]>
+ ApplicationId
of the submitted
+ application]]>
+ Priority
of the application]]>
+ Container
with which the ApplicationMaster
is
+ launched.
+ @return ContainerLaunchContext
for the
+ ApplicationMaster
container]]>
+ Container
with which the ApplicationMaster
is
+ launched.
+ @param amContainer ContainerLaunchContext
for the
+ ApplicationMaster
container]]>
+ ApplicationMaster
for
+ this application.]]>
+ ApplicationMaster
+ for this application.]]>
+ + For unmanaged AM, if the flag is true, RM allows re-register and returns + the running containers in the same attempt back to the UAM for HA. +
+ + @param keepContainers the flag which indicates whether to keep containers + across application attempts.]]> +LogAggregationContext
of the application]]>
+ ApplicationTimeouts
of the application.]]>
+ + Note: If application timeout value is less than or equal to zero + then application submission will throw an exception. +
+ @param applicationTimeoutsApplicationTimeouts
s for the
+ application]]>
+ ApplicationMaster
is executed.
+ Resource
allocated to the container]]>
+ Container
was
+ allocated.
+ @return Priority
at which the Container
was
+ allocated]]>
+ ContainerToken
is the security token used by the framework
+ to verify authenticity of any Container
.
The ResourceManager
, on container allocation provides a
+ secure token which is verified by the NodeManager
on
+ container launch.
Applications do not need to care about ContainerToken
, they
+ are transparently handled by the framework - the allocated
+ Container
includes the ContainerToken
.
ContainerToken
for the container]]>
+ + The scheduler may return multiple {@code AllocateResponse}s corresponding + to the same ID as and when scheduler allocates {@code Container}s. + Applications can continue to completely ignore the returned ID in + the response and use the allocation for any of their outstanding requests. +
+ + @return the ID corresponding to the original allocation request + which is satisfied by this allocation.]]> +
+ It includes details such as: +
Container
was assigned.
+
+ Note: If containers are kept alive across application attempts via
+ {@link ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
+ the ContainerId
does not necessarily contain the current
+ running application attempt's ApplicationAttemptId
This
+ container can be allocated by previously exited application attempt and
+ managed by the current running attempt thus have the previous application
+ attempt's ApplicationAttemptId
.
+
ApplicationAttemptId
of the application to which the
+ Container
was assigned]]>
+ getContainerId
instead.
+ @return lower 32 bits of identifier of the ContainerId
]]>
+ ContainerId
]]>
+ ContainerId
represents a globally unique identifier
+ for a {@link Container} in the cluster.]]>
+ LocalResource
required by the container]]>
+ LocalResource
required by the container]]>
+ + This will be used to initialize this application on the specific + {@link AuxiliaryService} running on the NodeManager by calling + {@link AuxiliaryService#initializeApplication(ApplicationInitializationContext)} +
+ + @return application-specific binary service data]]> +ApplicationACL
s]]>
+ ApplicationACL
s for the application]]>
+ ContainerRetryContext
to relaunch container.]]>
+ ContainerRetryContext
to
+ relaunch container.]]>
+ ContainerId
of the container.]]>
+ Resource
of the container.]]>
+ NodeId
where container is running.]]>
+ Priority
of the container.]]>
+ ContainerState
of the container.]]>
+ exit status
of the container.]]>
+ Container
.]]>
+ Container
.]]>
+ ContainerId
of the container]]>
+ ExecutionType
of the container]]>
+ ContainerState
of the container]]>
+ Note: This is valid only for completed containers i.e. containers + with state {@link ContainerState#COMPLETE}. + Otherwise, it returns an ContainerExitStatus.INVALID. +
+ +Containers killed by the framework, either due to being released by + the application or being 'lost' due to node failures etc. have a special + exit code of ContainerExitStatus.ABORTED.
+ +When threshold number of the nodemanager-local-directories or + threshold number of the nodemanager-log-directories become bad, then + container is not launched and is exited with ContainersExitStatus.DISKS_FAILED. +
+ + @return exit status for the container]]> +Resource
allocated to the container]]>
+ ExecutionType
.]]>
+ LocalResourceType
of the resource to be localized]]>
+ LocalResourceType
of the resource to be localized]]>
+ LocalResourceVisibility
of the resource to be
+ localized]]>
+ LocalResourceVisibility
of the resource to be
+ localized]]>
+ PATTERN
).
+ @return pattern that should be used to extract entries from the
+ archive.]]>
+ PATTERN
).
+ @param pattern pattern that should be used to extract entries
+ from the archive.]]>
+ LocalResource
represents a local resource required to
+ run a container.
+
+ The NodeManager
is responsible for localizing the resource
+ prior to launching the container.
Applications can specify {@link LocalResourceType} and + {@link LocalResourceVisibility}.
+ + @see LocalResourceType + @see LocalResourceVisibility + @see ContainerLaunchContext + @see ApplicationSubmissionContext + @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)]]> ++ The type can be one of: +
NodeManager
.
+ + The visibility can be one of: +
NodeManager
for which the
+ NMToken is used to authenticate.]]>
+ NodeManager
]]>
+ NodeManager
+ It is issued by ResourceMananger
when ApplicationMaster
+ negotiates resource with ResourceManager
and
+ validated on NodeManager
side.
+ A given Node can be mapped with any kind of attribute, few examples are + HAS_SSD=true, JAVA_VERSION=JDK1.8, OS_TYPE=WINDOWS. +
+
+ Its not compulsory for all the attributes to have value, empty string is the
+ default value of the NodeAttributeType.STRING
+
+ Node Attribute Prefix is used as namespace to segregate the attributes. +
]]> ++ Node Attribute Prefix is used as namespace to segregate the attributes. +
]]> +node Attribute
.
+
+ Based on this attribute expressions and values will be evaluated.]]>
+ NodeId
is the unique identifier for a node.
+
+ It includes the hostname and port to uniquely
+ identify the node. Thus, it is unique across restarts of any
+ NodeManager
.
NodeId
of the node]]>
+ NodeState
of the node]]>
+ Resource
on the node.
+ @return used Resource
on the node]]>
+ Resource
on the node.
+ @return total Resource
on the node]]>
+ Node
.]]>
+ ResourceManager
.
+ @see AllocateRequest#setAskList(List)]]>
+ ResourceManager
. If the AM prefers a different set of
+ containers, then it may checkpoint or kill containers matching the
+ description in {@link #getResourceRequest}.
+ @return Set of containers at risk if the contract is not met.]]>
+ ApplicationMaster
(AM) can satisfy this request according
+ to its own priorities to prevent containers from being forcibly killed by
+ the platform.
+ @see PreemptionMessage]]>
+ + In contrast, the {@link PreemptionContract} also includes a description of + resources with a set of containers. If the AM releases containers matching + that profile, then the containers enumerated in {@link + PreemptionContract#getContainers()} may not be killed. +
+ Each preemption message reflects the RM's current understanding of the + cluster state, so a request to return N containers may not + reflect containers the AM is releasing, recently exited containers the RM has + yet to learn about, or new containers allocated before the message was + generated. Conversely, an RM may request a different profile of containers in + subsequent requests. +
+ The policy enforced by the RM is part of the scheduler. Generally, only + containers that have been requested consistently should be killed, but the + details are not specified.]]> +
QueueState
of the queue]]>
+ accessible node labels
of the queue]]>
+ ApplicationSubmissionContext
and
+ ResourceRequest
don't specify their
+ NodeLabelExpression
.
+
+ @return default node label expression
of the queue]]>
+ queue stats
of the queue]]>
+ QueueACL
for the given user]]>
+ QueueUserACLInfo
provides information {@link QueueACL} for
+ the given user.
+
+ @see QueueACL
+ @see ApplicationClientProtocol#getQueueUserAcls(org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest)]]>
+ + The globally unique nature of the identifier is achieved by using the + cluster timestamp i.e. start-time of the {@code ResourceManager} + along with a monotonically increasing counter for the reservation. +
]]> +Resource
models a set of computer resources in the
+ cluster.
+
+ Currently it models both memory and CPU.
+ +The unit for memory is megabytes. CPU is modeled with virtual cores + (vcores), a unit for expressing parallelism. A node's capacity should + be configured with virtual cores equal to its number of physical cores. A + container should be requested with the number of cores it can saturate, i.e. + the average number of threads it expects to have runnable at a time.
+ +Virtual cores take integer values and thus currently CPU-scheduling is + very coarse. A complementary axis for CPU requests that represents + processing power will likely be added in the future to enable finer-grained + resource configuration.
+ +Typically, applications request Resource
of suitable
+ capability to run their component tasks.
Priority
of the request]]>
+ Priority
of the request]]>
+ ResourceRequest
.]]>
+ ResourceRequest
.]]>
+ + +
If the flag is off on a rack-level ResourceRequest
,
+ containers at that request's priority will not be assigned to nodes on that
+ request's rack unless requests specifically for those nodes have also been
+ submitted.
+ +
If the flag is off on an {@link ResourceRequest#ANY}-level
+ ResourceRequest
, containers at that request's priority will
+ only be assigned on racks for which specific requests have also been
+ submitted.
+ +
For example, to request a container strictly on a specific node, the + corresponding rack-level and any-level requests should have locality + relaxation set to false. Similarly, to request a container strictly on a + specific rack, the corresponding any-level request should have locality + relaxation set to false.
+
+ @param relaxLocality whether locality relaxation is enabled with this
+ ResourceRequest
.]]>
+
+ The scheduler may return multiple {@code AllocateResponse}s corresponding + to the same ID as and when scheduler allocates {@code Container}(s). + Applications can continue to completely ignore the returned ID in + the response and use the allocation for any of their outstanding requests. +
+ If one wishes to replace an entire {@code ResourceRequest} corresponding to + a specific ID, they can simply cancel the corresponding {@code + ResourceRequest} and submit a new one afresh. + + @return the ID corresponding to this allocation request.]]> +
+ The scheduler may return multiple {@code AllocateResponse}s corresponding + to the same ID as and when scheduler allocates {@code Container}(s). + Applications can continue to completely ignore the returned ID in + the response and use the allocation for any of their outstanding requests. +
+ If one wishes to replace an entire {@code ResourceRequest} corresponding to + a specific ID, they can simply cancel the corresponding {@code + ResourceRequest} and submit a new one afresh. +
+ If the ID is not set, scheduler will continue to work as previously and all + allocated {@code Container}(s) will have the default ID, -1. + + @param allocationRequestID the ID corresponding to this allocation + request.]]> +
Resource
capability of the request]]>
+ Resource
capability of the request]]>
+ priority
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ resourceName
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ capability
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ numContainers
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ relaxLocality
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ nodeLabelExpression
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ executionTypeRequest
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ executionType
of the request.
+ @return {@link ResourceRequestBuilder}]]>
+ allocationRequestId
of the request
+ @return {@link ResourceRequestBuilder}]]>
+ ResourceUtilization
models the utilization of a set of computer
+ resources in the cluster.
+ ]]>
+ allocationRequestId
of the
+ request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}]]>
+ priority
of the request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}
+ @see SchedulingRequest#setPriority(Priority)]]>
+ executionType
of the request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}]]>
+ allocationsTags
of the request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}]]>
+ resourceSizing
of the request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}]]>
+ placementConstraints
of
+ the request
+ @return {@link SchedulingRequest.SchedulingRequestBuilder}]]>
+ ResourceManager
.
+ @return the set of {@link ContainerId} to be preempted.]]>
+ Token
is the security entity used by the framework
+ to verify authenticity of any resource.]]>
+ ContainerId
of the container]]>
+ ContainerUpdateType
of the container.]]>
+ ContainerId
of the container]]>
+ ContainerId
of the container]]>
+ ExecutionType
of the container]]>
+ ExecutionType
of the container]]>
+ Resource
capability of the request]]>
+ Resource
capability of the request]]>
+ URL
represents a serializable {@link java.net.URL}.]]>
+ NodeManager
s in the cluster]]>
+ DecommissionedNodeManager
s in the cluster]]>
+ ActiveNodeManager
s in the cluster]]>
+ LostNodeManager
s in the cluster]]>
+ UnhealthyNodeManager
s in the cluster]]>
+ RebootedNodeManager
s in the cluster]]>
+ YarnClusterMetrics
represents cluster metrics.
+
+ Currently only number of NodeManager
s is provided.
+ The reader and writer users/groups pattern that the user can supply is the
+ same as what AccessControlList
takes.
+
+ Primary filters will be used to index the entities in
+ TimelineStore
, such that users should carefully choose the
+ information they want to store as the primary filters. The remaining can be
+ stored as other information.
+
TimelineEntityGroupId
.
+
+ @return ApplicationId
of the
+ TimelineEntityGroupId
]]>
+ timelineEntityGroupId
]]>
+ TimelineEntityGroupId
is an abstract way for
+ timeline service users to represent #a group of related timeline data.
+ For example, all entities that represents one data flow DAG execution
+ can be grouped into one timeline entity group. ]]>
+
+ The reader and writer users/groups pattern that the user can supply is the
+ same as what AccessControlList
takes.
+
+ It is usually used in the case where we want to recover class polymorphism + after deserializing the entity from its JSON form. +
+ @param entity the real entity that carries information]]> ++ Users can use {@link TimelineServiceHelper#invertLong(long)} to invert + the prefix if necessary. + + @param entityIdPrefix prefix for an entity.]]> ++ TimelineEntity entity = new TimelineEntity(); + entity.setIdPrefix(value); +
InetSocketAddress
. On an HA cluster,
+ this fetches the address corresponding to the RM identified by
+ {@link #RM_HA_ID}.
+ @param name property name.
+ @param defaultAddress the default value
+ @param defaultPort the default port
+ @return InetSocketAddress]]>
+ + Note: Use {@link #DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for + cross-platform practice i.e. submit an application from a Windows client to + a Linux/Unix server or vice versa. +
]]> +SharedCacheManager
to run a cleaner task
+ @return SharedCacheManager
returns an empty response
+ on success and throws an exception on rejecting the request
+ @throws YarnException
+ @throws IOException]]>
+ SharedCacheManager
+ ]]>
+ foo(3)
+ + Optional when using NodeAttribute Constraint. + + and where Pn can be any form of a valid constraint expression, + such as: + +YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
+ If the user does not have VIEW_APP
access then the following
+ fields in the report will be set to stubbed values:
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ In secure mode, YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
+ In secure mode, YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
ResourceManager
. New containers assigned to the master are
+ retrieved. Status of completed containers and node health updates are also
+ retrieved. This also doubles up as a heartbeat to the ResourceManager and
+ must be made periodically. The call may not always return any new
+ allocations of containers. App should not make concurrent allocate
+ requests. May cause request loss.
+
+ + Note : If the user has not removed container requests that have already + been satisfied, then the re-register may end up sending the entire + container requests to the RM (including matched requests). Which would mean + the RM could end up giving it a lot of new allocated containers. +
+ + @param progressIndicator Indicates progress made by the master + @return the response of the allocate request + @throws YarnException + @throws IOException]]> +Container
s that are returned from previous successful
+ allocations or resource changes. By passing in the existing container and a
+ target resource capability to this method, the application requests the
+ ResourceManager to change the existing resource allocation to the target
+ resource allocation.
+
+ @deprecated use
+ {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+
+ @param container The container returned from the last successful resource
+ allocation or resource change
+ @param capability The target resource capability of the container]]>
+ UpdateContainerRequest
.]]>
+ addContainerRequest
earlier in the lifecycle. For performance,
+ the AMRMClient may return its internal collection directly without creating
+ a copy. Users should not perform mutable operations on the return value.
+ Each collection in the list contains requests with identical
+ Resource
size that fit in the given capability. In a
+ collection, requests will be returned in the same order as they were added.
+
+ NOTE: This API only matches Container requests that were created by the
+ client WITHOUT the allocationRequestId being set.
+
+ @return Collection of request matching the parameters]]>
+ addContainerRequest
earlier in the lifecycle. For performance,
+ the AMRMClient may return its internal collection directly without creating
+ a copy. Users should not perform mutable operations on the return value.
+ Each collection in the list contains requests with identical
+ Resource
size that fit in the given capability. In a
+ collection, requests will be returned in the same order as they were added.
+ specify an ExecutionType
.
+
+ NOTE: This API only matches Container requests that were created by the
+ client WITHOUT the allocationRequestId being set.
+
+ @param priority Priority
+ @param resourceName Location
+ @param executionType ExecutionType
+ @param capability Capability
+ @return Collection of request matching the parameters]]>
+ addContainerRequest
earlier in the lifecycle. For performance,
+ the AMRMClient may return its internal collection directly without creating
+ a copy. Users should not perform mutable operations on the return value.
+
+ NOTE: This API only matches Container requests that were created by the
+ client WITH the allocationRequestId being set to a non-default value.
+
+ @param allocationRequestId Allocation Request Id
+ @return Collection of request matching the parameters]]>
+ AMRMClient
+ + If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + singleton instance will be used. + + @param nmTokenCache the NM token cache to use.]]> +
AMRMClient
.
+ + If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + singleton instance will be used. + + @return the NM token cache.]]> +
checkEveryMillis
ms.
+ See also {@link #waitFor(java.util.function.Supplier, int, int)}
+ @param check user defined checker
+ @param checkEveryMillis interval to call check
]]>
+ checkEveryMillis
ms. In the main loop, this method will log
+ the message "waiting in main loop" for each logInterval
times
+ iteration to confirm the thread is alive.
+ @param check user defined checker
+ @param checkEveryMillis interval to call check
+ @param logInterval interval to log for each]]>
+ The ApplicationMaster
or other applications that use the
+ client must provide the details of the allocated container, including the
+ Id, the assigned node's Id and the token via {@link Container}. In
+ addition, the AM needs to provide the {@link ContainerLaunchContext} as
+ well.
NodeManager
to launch the
+ container
+ @return a map between the auxiliary service names and their outputs
+ @throws YarnException YarnException.
+ @throws IOException IOException.]]>
+ The ApplicationMaster
or other applications that use the
+ client must provide the details of the container, including the Id and
+ the target resource encapsulated in the updated container token via
+ {@link Container}.
+
The ApplicationMaster
or other applications that use the
+ client must provide the details of the container, including the Id and
+ the target resource encapsulated in the updated container token via
+ {@link Container}.
+
NodeManager
+
+ @throws YarnException YarnException.
+ @throws IOException IOException.]]>
+ NodeManager
+
+ @return the status of a container.
+
+ @throws YarnException YarnException.
+ @throws IOException IOException.]]>
+ NMClient
+ + If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + singleton instance will be used. + + @param nmTokenCache the NM token cache to use.]]> +
NMClient
+ + If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} + singleton instance will be used. + + @return the NM token cache]]> +
+ NMTokenCache nmTokenCache = new NMTokenCache(); + AMRMClient rmClient = AMRMClient.createAMRMClient(); + NMClient nmClient = NMClient.createNMClient(); + nmClient.setNMTokenCache(nmTokenCache); + ... ++
+ NMTokenCache nmTokenCache = new NMTokenCache(); + AMRMClient rmClient = AMRMClient.createAMRMClient(); + NMClient nmClient = NMClient.createNMClient(); + nmClient.setNMTokenCache(nmTokenCache); + AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]); + NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]); + ... ++
+ NMTokenCache nmTokenCache = new NMTokenCache(); + ... + ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); + ... + AllocateRequest allocateRequest = ... + ... + AllocateResponse allocateResponse = rmClient.allocate(allocateRequest); + for (NMToken token : allocateResponse.getNMTokens()) { + nmTokenCache.setToken(token.getNodeId().toString(), token.getToken()); + } + ... + ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache); + ... + nmPro.startContainer(container, containerContext); + ... ++
SharedCacheManager.
+ The client uses a checksum to identify the resource and an
+ {@link ApplicationId} to identify which application will be using the
+ resource.
+
+
+
+ The SharedCacheManager
responds with whether or not the
+ resource exists in the cache. If the resource exists, a URL
to
+ the resource in the shared cache is returned. If the resource does not
+ exist, null is returned instead.
+
+ Once a URL has been returned for a resource, that URL is safe to use for + the lifetime of the application that corresponds to the provided + ApplicationId. +
+ + @param applicationId ApplicationId of the application using the resource + @param resourceKey the key (i.e. checksum) that identifies the resource + @return URL to the resource, or null if it does not exist]]> +SharedCacheManager.
+ This method is called once an application is no longer using a claimed
+ resource in the shared cache. The client uses a checksum to identify the
+ resource and an {@link ApplicationId} to identify which application is
+ releasing the resource.
+
+
+ + Note: This method is an optimization and the client is not required to call + it for correctness. +
+ + @param applicationId ApplicationId of the application releasing the + resource + @param resourceKey the key (i.e. checksum) that identifies the resource]]> +YARN.
It is a blocking call - it
+ will not return {@link ApplicationId} until the submitted application is
+ submitted successfully and accepted by the ResourceManager.
+
+
+ + Users should provide an {@link ApplicationId} as part of the parameter + {@link ApplicationSubmissionContext} when submitting a new application, + otherwise it will throw the {@link ApplicationIdNotProvidedException}. +
+ +This internally calls {@link ApplicationClientProtocol#submitApplication + (SubmitApplicationRequest)}, and after that, it internally invokes + {@link ApplicationClientProtocol#getApplicationReport + (GetApplicationReportRequest)} and waits till it can make sure that the + application gets properly submitted. If RM fails over or RM restart + happens before ResourceManager saves the application's state, + {@link ApplicationClientProtocol + #getApplicationReport(GetApplicationReportRequest)} will throw + the {@link ApplicationNotFoundException}. This API automatically resubmits + the application with the same {@link ApplicationSubmissionContext} when it + catches the {@link ApplicationNotFoundException}
+ + @param appContext + {@link ApplicationSubmissionContext} containing all the details + needed to submit a new application + @return {@link ApplicationId} of the accepted application + @throws YarnException + @throws IOException + @see #createApplication()]]> +
+ In secure mode, YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
+ If the user does not have VIEW_APP
access then the following
+ fields in the report will be set to stubbed values:
+
+ The AMRM token will be returned only if all the following conditions are + met: +
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ If the user does not have VIEW_APP
access for an application
+ then the corresponding report will be filtered as described in
+ {@link #getApplicationReport(ApplicationId)}.
+
+ In secure mode, YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
+ In secure mode, YARN
verifies access to the application, queue
+ etc. before accepting the request.
+
+ The client packages all details of its request in a + {@link ReservationSubmissionRequest} object. This contains information + about the amount of capacity, temporal constraints, and gang needs. + Furthermore, the reservation might be composed of multiple stages, with + ordering dependencies among them. +
+ ++ In order to respond, a new admission control component in the + {@code ResourceManager} performs an analysis of the resources that have + been committed over the period of time the user is requesting, verify that + the user requests can be fulfilled, and that it respect a sharing policy + (e.g., {@code CapacityOverTimePolicy}). Once it has positively determined + that the ReservationRequest is satisfiable the {@code ResourceManager} + answers with a {@link ReservationSubmissionResponse} that includes a + {@link ReservationId}. Upon failure to find a valid allocation the response + is an exception with the message detailing the reason of failure. +
+ ++ The semantics guarantees that the {@link ReservationId} returned, + corresponds to a valid reservation existing in the time-range request by + the user. The amount of capacity dedicated to such reservation can vary + overtime, depending of the allocation that has been determined. But it is + guaranteed to satisfy all the constraint expressed by the user in the + {@link ReservationDefinition} +
+ + @param request request to submit a new Reservation + @return response contains the {@link ReservationId} on accepting the + submission + @throws YarnException if the reservation cannot be created successfully + @throws IOException]]> ++ The allocation is attempted by virtually substituting all previous + allocations related to this Reservation with new ones, that satisfy the new + {@link ReservationDefinition}. Upon success the previous allocation is + atomically substituted by the new one, and on failure (i.e., if the system + cannot find a valid allocation for the updated request), the previous + allocation remains valid. +
+ + @param request to update an existing Reservation (the + {@link ReservationUpdateRequest} should refer to an existing valid + {@link ReservationId}) + @return response empty on successfully updating the existing reservation + @throws YarnException if the request is invalid or reservation cannot be + updated successfully + @throws IOException]]> +ResourceManager
]]>
+ ResourceManager
]]>
+ Container
s that are returned from previous successful
+ allocations or resource changes. By passing in the existing container and a
+ target resource capability to this method, the application requests the
+ ResourceManager to change the existing resource allocation to the target
+ resource allocation.
+
+ @deprecated use
+ {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+
+ @param container The container returned from the last successful resource
+ allocation or resource change
+ @param capability The target resource capability of the container]]>
+ UpdateContainerRequest
.]]>
+ checkEveryMillis
ms.
+ See also {@link #waitFor(java.util.function.Supplier, int, int)}
+ @param check user defined checker
+ @param checkEveryMillis interval to call check
]]>
+ checkEveryMillis
ms. In the main loop, this method will log
+ the message "waiting in main loop" for each logInterval
times
+ iteration to confirm the thread is alive.
+ @param check user defined checker
+ @param checkEveryMillis interval to call check
+ @param logInterval interval to log for each]]>
+ + {@code + class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { + public void onContainersAllocated(List+ + The client's lifecycle should be managed similarly to the following: + +containers) { + [run tasks on the containers] + } + + public void onContainersUpdated(List containers) { + [determine if resource allocation of containers have been increased in + the ResourceManager, and if so, inform the NodeManagers to increase the + resource monitor/enforcement on the containers] + } + + public void onContainersCompleted(List statuses) { + [update progress, check whether app is done] + } + + public void onNodesUpdated(List updated) {} + + public void onReboot() {} + } + } +
+ {@code + AMRMClientAsync asyncClient = + createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); + asyncClient.init(conf); + asyncClient.start(); + RegisterApplicationMasterResponse response = asyncClient + .registerApplicationMaster(appMasterHostname, appMasterRpcPort, + appMasterTrackingUrl); + asyncClient.addContainerRequest(containerRequest); + [... wait for application to complete] + asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); + asyncClient.stop(); + } +]]> +
The ApplicationMaster
or other applications that use the
+ client must provide the details of the container, including the Id and
+ the target resource encapsulated in the updated container token via
+ {@link Container}.
+
+ {@code + class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler { + public void onContainerStarted(ContainerId containerId, + Map+ + The client's life-cycle should be managed like the following: + +allServiceResponse) { + [post process after the container is started, process the response] + } + + public void onContainerResourceIncreased(ContainerId containerId, + Resource resource) { + [post process after the container resource is increased] + } + + public void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus) { + [make use of the status of the container] + } + + public void onContainerStopped(ContainerId containerId) { + [post process after the container is stopped] + } + + public void onStartContainerError( + ContainerId containerId, Throwable t) { + [handle the raised exception] + } + + public void onGetContainerStatusError( + ContainerId containerId, Throwable t) { + [handle the raised exception] + } + + public void onStopContainerError( + ContainerId containerId, Throwable t) { + [handle the raised exception] + } + } + } +
+ {@code + NMClientAsync asyncClient = + NMClientAsync.createNMClientAsync(new MyCallbackhandler()); + asyncClient.init(conf); + asyncClient.start(); + asyncClient.startContainer(container, containerLaunchContext); + [... wait for container being started] + asyncClient.getContainerStatus(container.getId(), container.getNodeId(), + container.getContainerToken()); + [... handle the status in the callback instance] + asyncClient.stopContainer(container.getId(), container.getNodeId(), + container.getContainerToken()); + [... wait for container being stopped] + asyncClient.stop(); + } +]]> +
yarn.sharedcache.checksum.algo.impl
)
+
+ @return SharedCacheChecksum
object]]>
+ + Note that header and ellipses are not counted against {@link #limit}. +
+ An example: + +
+ {@code + // At the beginning it's an empty string + final Appendable shortAppender = new BoundedAppender(80); + // The whole message fits into limit + shortAppender.append( + "message1 this is a very long message but fitting into limit\n"); + // The first message is truncated, the second not + shortAppender.append("message2 this is shorter than the previous one\n"); + // The first message is deleted, the second truncated, the third + // preserved + shortAppender.append("message3 this is even shorter message, maybe.\n"); + // The first two are deleted, the third one truncated, the last preserved + shortAppender.append("message4 the shortest one, yet the greatest :)"); + // Current contents are like this: + // Diagnostic messages truncated, showing last 80 chars out of 199: + // ...s is even shorter message, maybe. + // message4 the shortest one, yet the greatest :) + } ++
+ Note that null values are {@link #append(CharSequence) append}ed + just like in {@link StringBuilder#append(CharSequence) original + implementation}. +
+ Note that this class is not thread safe.]]> +
false
]]>
+