Merge -r 1215140:1215141 from trunk to branch. FIXES: MAPREDUCE-778

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1234070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-01-20 18:55:20 +00:00
parent 5809510395
commit 71ac65859e
45 changed files with 4603 additions and 249 deletions

View File

@ -5,6 +5,7 @@ Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
NEW FEATURES NEW FEATURES
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev) MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev)
@ -14,6 +15,8 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster. MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster.
(Anupam Seth via mahadev) (Anupam Seth via mahadev)
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests. MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
(Vinay Thota via amarrk) (Vinay Thota via amarrk)

View File

@ -139,6 +139,13 @@
<dependency org="org.vafer" name="jdeb" rev="${jdeb.version}" conf="package->master"/> <dependency org="org.vafer" name="jdeb" rev="${jdeb.version}" conf="package->master"/>
<dependency org="org.mortbay.jetty" name="jetty-servlet-tester" rev="${jetty.version}" <dependency org="org.mortbay.jetty" name="jetty-servlet-tester" rev="${jetty.version}"
conf="test->default"/> conf="test->default"/>
<!-- dependency for rumen anonymization -->
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}"
conf="compile->default"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"
conf="compile->default"/>
<!-- dependency addition for the fault injection --> <!-- dependency addition for the fault injection -->
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}" <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="compile->default"/> conf="compile->default"/>

View File

@ -81,5 +81,6 @@ wagon-http.version=1.0-beta-2
xmlenc.version=0.52 xmlenc.version=0.52
xerces.version=1.4.4 xerces.version=1.4.4
yarn.version=0.23.1-SNAPSHOT jackson.version=1.8.2
hadoop-mapreduce.version=0.23.1-SNAPSHOT yarn.version=0.24.0-SNAPSHOT
hadoop-mapreduce.version=0.24.0-SNAPSHOT

View File

@ -26,8 +26,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStory;
import static org.apache.hadoop.tools.rumen.datatypes.util.MapReduceJobPropertiesParser.extractMaxHeapOpts;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -92,8 +91,6 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
// configuration key to enable/disable task jvm options // configuration key to enable/disable task jvm options
static final String GRIDMIX_TASK_JVM_OPTIONS_ENABLE = static final String GRIDMIX_TASK_JVM_OPTIONS_ENABLE =
"gridmix.task.jvm-options.enable"; "gridmix.task.jvm-options.enable";
private static final Pattern maxHeapPattern =
Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
private static void setJobQueue(Job job, String queue) { private static void setJobQueue(Job job, String queue) {
if (queue != null) { if (queue != null) {
@ -226,18 +223,6 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
} }
} }
private static void extractMaxHeapOpts(String javaOptions,
List<String> maxOpts, List<String> others) {
for (String opt : javaOptions.split(" ")) {
Matcher matcher = maxHeapPattern.matcher(opt);
if (matcher.find()) {
maxOpts.add(opt);
} else {
others.add(opt);
}
}
}
// Scales the desired job-level configuration parameter. This API makes sure // Scales the desired job-level configuration parameter. This API makes sure
// that the ratio of the job level configuration parameter to the cluster // that the ratio of the job level configuration parameter to the cluster
// level configuration parameter is maintained in the simulated run. Hence // level configuration parameter is maintained in the simulated run. Hence

View File

@ -73,6 +73,11 @@
computed for the total number of successful tasks for every attempt. computed for the total number of successful tasks for every attempt.
</li> </li>
<li>Anonymized traces enables sharing of production traces of large
scale Hadoop deployments. Sharing of traces will foster
collaboration within the Hadoop community. It can also be used to
supplement interesting research findings.
</li>
</ul> </ul>
</section> </section>
@ -102,6 +107,11 @@
Increasing the trace runtime might involve adding some dummy jobs to Increasing the trace runtime might involve adding some dummy jobs to
the resulting trace and scaling up the runtime of individual jobs. the resulting trace and scaling up the runtime of individual jobs.
</li> </li>
<li><em>Anonymizer</em> :
A utility to anonymize Hadoop job and cluster topology traces by
masking certain sensitive fields but retaining important workload
characteristics.
</li>
</ul> </ul>
<p></p><p></p><p></p> <p></p><p></p><p></p>
@ -128,10 +138,11 @@
<code>output-duration</code>, <code>concentration</code> etc. <code>output-duration</code>, <code>concentration</code> etc.
</note> </note>
<p><em>Rumen</em> provides 2 basic commands</p> <p><em>Rumen</em> provides 3 basic commands</p>
<ul> <ul>
<li><code>TraceBuilder</code></li> <li><code>TraceBuilder</code></li>
<li><code>Folder</code></li> <li><code>Folder</code></li>
<li><code>Anonymizer</code></li>
</ul> </ul>
<p>Firstly, we need to generate the <em>Gold Trace</em>. Hence the first <p>Firstly, we need to generate the <em>Gold Trace</em>. Hence the first
@ -139,8 +150,9 @@
The output of the <code>TraceBuilder</code> is a job-trace file (and an The output of the <code>TraceBuilder</code> is a job-trace file (and an
optional cluster-topology file). In case we want to scale the output, we optional cluster-topology file). In case we want to scale the output, we
can use the <code>Folder</code> utility to fold the current trace to the can use the <code>Folder</code> utility to fold the current trace to the
desired length. The remaining part of this section explains these desired length. For anonymizing the trace, use the
utilities in detail. <code>Anonymizer</code> utility. The remaining part of this section
explains these utilities in detail.
</p> </p>
<note>Examples in this section assumes that certain libraries are present <note>Examples in this section assumes that certain libraries are present
@ -426,8 +438,156 @@
</p> </p>
</section> </section>
</section> </section>
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
<p></p><p></p><p></p><p></p>
</section>
<!--
Anonymizer command
-->
<section>
<title>Anonymizer</title>
<p><code>Command:</code></p>
<source>java org.apache.hadoop.tools.rumen.Anonymizer [options] [-trace &lt;jobtrace-input&gt; &lt;jobtrace-output&gt;] [-topology &lt;topology-input&gt; &lt;topology-output&gt;]</source>
<p>This command invokes the <em>Anonymizer</em> utility of
<em>Rumen</em>. It anonymizes sensitive information from the
<code>&lt;jobtrace-input&gt;</code> file and outputs the anonymized
content into the <code>&lt;jobtrace-output&gt;</code>
file. It also anonymizes the cluster layout (topology) from the
<code>&lt;topology-input&gt;</code> and outputs it in
the <code>&lt;topology-output&gt;</code> file.
<code>&lt;job-input&gt;</code> represents the job trace file obtained
using <code>TraceBuilder</code> or <code>Folder</code>.
<code>&lt;topology-input&gt;</code> represents the cluster topology
file obtained using <code>TraceBuilder</code>.
</p>
<p><code>Options :</code></p>
<table>
<tr>
<th>Parameter</th>
<th>Description</th>
<th>Notes</th>
</tr>
<tr>
<td><code>-trace</code></td>
<td>Anonymizes job traces.</td>
<td>Anonymizes sensitive fields like user-name, job-name, queue-name
host-names, job configuration parameters etc.</td>
</tr>
<tr>
<td><code>-topology</code></td>
<td>Anonymizes cluster topology</td>
<td>Anonymizes rack-names and host-names.</td>
</tr>
</table>
<section id="anonymizerconf">
<title><em>Anonymizer</em> Configuration Parameters</title>
<p>The Rumen anonymizer can be configured using the following
configuration parameters:
</p>
<table>
<tr>
<th>Parameter</th>
<th>Description</th>
</tr>
<tr>
<td>
<code>rumen.data-types.classname.preserve</code>
</td>
<td>A comma separated list of prefixes that the <em>Anonymizer</em>
will not anonymize while processing classnames. If
<code>rumen.data-types.classname.preserve</code> is set to
<code>'org.apache,com.hadoop.'</code> then
classnames starting with <code>'org.apache'</code> or
<code>'com.hadoop.'</code> will not be anonymized.
</td>
</tr>
<tr>
<td>
<code>rumen.datatypes.jobproperties.parsers</code>
</td>
<td>A comma separated list of job properties parsers. These parsers
decide how the job configuration parameters
(i.e &lt;key,value&gt; pairs) should be processed. Default is
<code>MapReduceJobPropertiesParser</code>. The default parser will
only parse framework-level MapReduce specific job configuration
properties. Users can add custom parsers by implementing the
<code>JobPropertiesParser</code> interface. Rumen also provides an
all-pass (i.e no filter) parser called
<code>DefaultJobPropertiesParser</code>.
</td>
</tr>
<tr>
<td>
<code>rumen.anonymization.states.dir</code>
</td>
<td>Set this to a location (on LocalFileSystem or HDFS) for enabling
state persistence and/or reload. This parameter is not set by
default. Reloading and persistence of states depend on the state
directory. Note that the state directory will contain the latest
as well as previous states.
</td>
</tr>
<tr>
<td>
<code>rumen.anonymization.states.persist</code>
</td>
<td>Set this to <code>'true'</code> to persist the current state.
Default value is <code>'false'</code>. Note that the states will
be persisted to the state manager's state directory
specified using the <code>rumen.anonymization.states.dir</code>
parameter.
</td>
</tr>
<tr>
<td>
<code>rumen.anonymization.states.reload</code>
</td>
<td>Set this to <code>'true'</code> to enable reuse of previously
persisted state. The default value is <code>'false'</code>. The
previously persisted state will be reloaded from the state
manager's state directory specified using the
<code>rumen.anonymization.states.dir</code> parameter. Note that
the <em>Anonymizer</em> will bail out if it fails to find any
previously persisted state in the state directory or if the state
directory is not set. If the user wishes to retain/reuse the
states across multiple invocations of the <em>Anonymizer</em>,
then the very first invocation of the <em>Anonymizer</em> should
have <code>rumen.anonymization.states.reload</code> set to
<code>'false'</code> and
<code>rumen.anonymization.states.persist</code> set to
<code>'true'</code>. Subsequent invocations of the
<em>Anonymizer</em> can then have
<code>rumen.anonymization.states.reload</code> set to
<code>'true'</code>.
</td>
</tr>
</table>
</section>
<section>
<title>Example</title>
<source>java org.apache.hadoop.tools.rumen.Anonymizer -trace file:///home/user/job-trace.json file:///home/user/job-trace-anonymized.json -topology file:///home/user/cluster-topology.json file:///home/user/cluster-topology-anonymized.json</source>
<p></p>
<p>This will anonymize the job details from
<code>file:///home/user/job-trace.json</code> and output it to
<code>file:///home/user/job-trace-anonymized.json</code>.
It will also anonymize the cluster topology layout from
<code>file:///home/user/cluster-topology.json</code> and output it to
<code>file:///home/user/cluster-topology-anonymized.json</code>.
Note that the <code>Anonymizer</code> also supports input and output
files on HDFS.
</p>
</section>
</section> </section>
<p></p><p></p><p></p>
</section> </section>
<!-- <!--
@ -452,8 +612,8 @@
<li><code>Hadoop Common</code> (<code>hadoop-common-{hadoop-version}.jar</code>)</li> <li><code>Hadoop Common</code> (<code>hadoop-common-{hadoop-version}.jar</code>)</li>
<li><code>Apache Commons Logging</code> (<code>commons-logging-1.1.1.jar</code>)</li> <li><code>Apache Commons Logging</code> (<code>commons-logging-1.1.1.jar</code>)</li>
<li><code>Apache Commons CLI</code> (<code>commons-cli-1.2.jar</code>)</li> <li><code>Apache Commons CLI</code> (<code>commons-cli-1.2.jar</code>)</li>
<li><code>Jackson Mapper</code> (<code>jackson-mapper-asl-1.4.2.jar</code>)</li> <li><code>Jackson Mapper</code> (<code>jackson-mapper-asl-1.8.2.jar</code>)</li>
<li><code>Jackson Core</code> (<code>jackson-core-asl-1.4.2.jar</code>)</li> <li><code>Jackson Core</code> (<code>jackson-core-asl-1.8.2.jar</code>)</li>
</ul> </ul>
<note>One simple way to run Rumen is to use '$HADOOP_PREFIX/bin/hadoop jar' <note>One simple way to run Rumen is to use '$HADOOP_PREFIX/bin/hadoop jar'

View File

@ -20,12 +20,8 @@ package org.apache.hadoop.tools.rumen;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;

View File

@ -961,11 +961,11 @@ public class TestRumenJobTraces {
for (LoggedNetworkTopology rack : racks) { for (LoggedNetworkTopology rack : racks) {
List<LoggedNetworkTopology> nodes = rack.getChildren(); List<LoggedNetworkTopology> nodes = rack.getChildren();
if (rack.getName().endsWith(".64")) { if (rack.getName().getValue().endsWith(".64")) {
assertEquals("The singleton rack has the wrong number of elements", 1, assertEquals("The singleton rack has the wrong number of elements", 1,
nodes.size()); nodes.size());
sawSingleton = true; sawSingleton = true;
} else if (rack.getName().endsWith(".80")) { } else if (rack.getName().getValue().endsWith(".80")) {
assertEquals("The doubleton rack has the wrong number of elements", 2, assertEquals("The doubleton rack has the wrong number of elements", 2,
nodes.size()); nodes.size());
sawDoubleton = true; sawDoubleton = true;

View File

@ -0,0 +1,273 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.hadoop.tools.rumen.serializers.*;
import org.apache.hadoop.tools.rumen.state.*;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.module.SimpleModule;
public class Anonymizer extends Configured implements Tool {
private boolean anonymizeTrace = false;
private Path inputTracePath = null;
private Path outputTracePath = null;
private boolean anonymizeTopology = false;
private Path inputTopologyPath = null;
private Path outputTopologyPath = null;
//TODO Make this final if not using JSON
// private final StatePool statePool = new StatePool();
private StatePool statePool;
private ObjectMapper outMapper = null;
private JsonFactory outFactory = null;
private void initialize(String[] args) throws Exception {
try {
for (int i = 0; i < args.length; ++i) {
if ("-trace".equals(args[i])) {
anonymizeTrace = true;
inputTracePath = new Path(args[i+1]);
outputTracePath = new Path(args[i+2]);
i +=2;
}
if ("-topology".equals(args[i])) {
anonymizeTopology = true;
inputTopologyPath = new Path(args[i+1]);
outputTopologyPath = new Path(args[i+2]);
i +=2;
}
}
} catch (Exception e) {
throw new IllegalArgumentException("Illegal arguments list!", e);
}
if (!anonymizeTopology && !anonymizeTrace) {
throw new IllegalArgumentException("Invalid arguments list!");
}
statePool = new StatePool();
// initialize the state manager after the anonymizers are registered
statePool.initialize(getConf());
outMapper = new ObjectMapper();
// define a module
SimpleModule module = new SimpleModule("Anonymization Serializer",
new Version(0, 1, 1, "FINAL"));
// add various serializers to the module
// use the default (as-is) serializer for default data types
module.addSerializer(DataType.class, new DefaultRumenSerializer());
// use a blocking serializer for Strings as they can contain sensitive
// information
module.addSerializer(String.class, new BlockingSerializer());
// use object.toString() for object of type ID
module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
// use getAnonymizedValue() for data types that have the anonymizing
// feature
module.addSerializer(AnonymizableDataType.class,
new DefaultAnonymizingRumenSerializer(statePool, getConf()));
// register the module with the object-mapper
outMapper.registerModule(module);
outFactory = outMapper.getJsonFactory();
}
// anonymize the job trace file
private void anonymizeTrace() throws Exception {
if (anonymizeTrace) {
System.out.println("Anonymizing trace file: " + inputTracePath);
JobTraceReader reader = null;
JsonGenerator outGen = null;
Configuration conf = getConf();
try {
// create a generator
outGen = createJsonGenerator(conf, outputTracePath);
// define the input trace reader
reader = new JobTraceReader(inputTracePath, conf);
// read the plain unanonymized logged job
LoggedJob job = reader.getNext();
while (job != null) {
// write it via an anonymizing channel
outGen.writeObject(job);
// read the next job
job = reader.getNext();
}
System.out.println("Anonymized trace file: " + outputTracePath);
} finally {
if (outGen != null) {
outGen.close();
}
if (reader != null) {
reader.close();
}
}
}
}
// anonymize the cluster topology file
private void anonymizeTopology() throws Exception {
if (anonymizeTopology) {
System.out.println("Anonymizing topology file: " + inputTopologyPath);
ClusterTopologyReader reader = null;
JsonGenerator outGen = null;
Configuration conf = getConf();
try {
// create a generator
outGen = createJsonGenerator(conf, outputTopologyPath);
// define the input cluster topology reader
reader = new ClusterTopologyReader(inputTopologyPath, conf);
// read the plain unanonymized logged job
LoggedNetworkTopology job = reader.get();
// write it via an anonymizing channel
outGen.writeObject(job);
System.out.println("Anonymized topology file: " + outputTopologyPath);
} finally {
if (outGen != null) {
outGen.close();
}
}
}
}
// Creates a JSON generator
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
@Override
public int run(String[] args) throws Exception {
try {
initialize(args);
} catch (Exception e) {
e.printStackTrace();
printUsage();
return -1;
}
return run();
}
/**
* Runs the actual anonymization tool.
*/
public int run() throws Exception {
try {
anonymizeTrace();
} catch (IOException ioe) {
System.err.println("Error running the trace anonymizer!");
ioe.printStackTrace();
System.out.println("\n\nAnonymization unsuccessful!");
return -1;
}
try {
anonymizeTopology();
} catch (IOException ioe) {
System.err.println("Error running the cluster topology anonymizer!");
ioe.printStackTrace();
System.out.println("\n\nAnonymization unsuccessful!");
return -1;
}
statePool.persist();
System.out.println("Anonymization completed successfully!");
return 0;
}
private static void printUsage() {
System.out.println("\nUsage:-");
System.out.print(" Anonymizer");
System.out.print(" [-trace <input-trace-path> <output-trace-path>]");
System.out.println(" [-topology <input-topology-path> "
+ "<output-topology-path>] ");
System.out.print("\n");
}
/**
* The main driver program to use the anonymization utility.
* @param args
*/
public static void main(String[] args) {
Anonymizer instance = new Anonymizer();
int result = 0;
try {
result = ToolRunner.run(instance, args);
} catch (Exception e) {
e.printStackTrace(System.err);
System.exit(-1);
}
if (result != 0) {
System.exit(result);
}
return;
}
}

View File

@ -35,23 +35,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
public class Folder extends Configured implements Tool { public class Folder extends Configured implements Tool {
private long outputDuration = -1; private long outputDuration = -1;
private long inputCycle = -1; private long inputCycle = -1;
@ -66,7 +55,7 @@ public class Folder extends Configured implements Tool {
static final private Log LOG = LogFactory.getLog(Folder.class); static final private Log LOG = LogFactory.getLog(Folder.class);
private DeskewedJobTraceReader reader = null; private DeskewedJobTraceReader reader = null;
private JsonGenerator outGen = null; private Outputter<LoggedJob> outGen = null;
private List<Path> tempPaths = new LinkedList<Path>(); private List<Path> tempPaths = new LinkedList<Path>();
@ -171,25 +160,8 @@ public class Folder extends Configured implements Tool {
skewBufferLength, !allowMissorting); skewBufferLength, !allowMissorting);
Path outPath = new Path(outputPathName); Path outPath = new Path(outputPathName);
ObjectMapper outMapper = new ObjectMapper(); outGen = new DefaultOutputter<LoggedJob>();
outMapper.configure( outGen.init(outPath, conf);
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory outFactory = outMapper.getJsonFactory();
FileSystem outFS = outPath.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(outPath);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(outPath), compressor);
} else {
output = outFS.create(outPath);
}
outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
tempDir = tempDir =
tempDirName == null ? outPath.getParent() : new Path(tempDirName); tempDirName == null ? outPath.getParent() : new Path(tempDirName);
@ -258,11 +230,6 @@ public class Folder extends Configured implements Tool {
} }
} }
ObjectMapper outMapper = new ObjectMapper();
outMapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory outFactory = outMapper.getJsonFactory();
// we initialize an empty heap so if we take an error before establishing // we initialize an empty heap so if we take an error before establishing
// a real one the finally code goes through // a real one the finally code goes through
Queue<Pair<LoggedJob, JobTraceReader>> heap = Queue<Pair<LoggedJob, JobTraceReader>> heap =
@ -310,8 +277,7 @@ public class Folder extends Configured implements Tool {
long currentIntervalEnd = Long.MIN_VALUE; long currentIntervalEnd = Long.MIN_VALUE;
Path nextSegment = null; Path nextSegment = null;
OutputStream tempUncompOut = null; Outputter<LoggedJob> tempGen = null;
JsonGenerator tempGen = null;
if (debug) { if (debug) {
LOG.debug("The first job has a submit time of " + firstJobSubmitTime); LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
@ -333,7 +299,9 @@ public class Folder extends Configured implements Tool {
if (tempGen != null) { if (tempGen != null) {
tempGen.close(); tempGen.close();
} }
for (int i = 0; i < 3 && tempUncompOut == null; ++i) {
nextSegment = null;
for (int i = 0; i < 3 && nextSegment == null; ++i) {
try { try {
nextSegment = nextSegment =
new Path(tempDir, "segment-" + tempNameGenerator.nextLong() new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
@ -347,7 +315,7 @@ public class Folder extends Configured implements Tool {
try { try {
if (!fs.exists(nextSegment)) { if (!fs.exists(nextSegment)) {
tempUncompOut = fs.create(nextSegment, false); break;
} }
continue; continue;
@ -360,6 +328,10 @@ public class Folder extends Configured implements Tool {
} }
} }
if (nextSegment == null) {
throw new RuntimeException("Failed to create a new file!");
}
if (debug) { if (debug) {
LOG.debug("Creating " + nextSegment LOG.debug("Creating " + nextSegment
+ " for a job with a submit time of " + job.getSubmitTime()); + " for a job with a submit time of " + job.getSubmitTime());
@ -369,23 +341,8 @@ public class Folder extends Configured implements Tool {
tempPaths.add(nextSegment); tempPaths.add(nextSegment);
CompressionCodec codec = tempGen = new DefaultOutputter<LoggedJob>();
new CompressionCodecFactory(conf).getCodec(nextSegment); tempGen.init(nextSegment, conf);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(tempUncompOut, compressor);
} else {
output = tempUncompOut;
}
tempUncompOut = null;
tempGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
if (debug) {
tempGen.useDefaultPrettyPrinter();
}
long currentIntervalNumber = long currentIntervalNumber =
(job.getSubmitTime() - firstJobSubmitTime) / inputCycle; (job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
@ -396,7 +353,9 @@ public class Folder extends Configured implements Tool {
// the temp files contain UDadjusted times, but each temp file's // the temp files contain UDadjusted times, but each temp file's
// content is in the same input cycle interval. // content is in the same input cycle interval.
tempGen.writeObject(job); if (tempGen != null) {
tempGen.output(job);
}
job = reader.nextJob(); job = reader.nextJob();
} }
@ -541,11 +500,11 @@ public class Folder extends Configured implements Tool {
private void maybeOutput(LoggedJob job) throws IOException { private void maybeOutput(LoggedJob job) throws IOException {
for (int i = 0; i < transcriptionRateInteger; ++i) { for (int i = 0; i < transcriptionRateInteger; ++i) {
outGen.writeObject(job); outGen.output(job);
} }
if (random.nextDouble() < transcriptionRateFraction) { if (random.nextDouble() < transcriptionRateFraction) {
outGen.writeObject(job); outGen.output(job);
} }
} }

View File

@ -56,12 +56,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.Decompressor;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
/** /**
* This is the main class for rumen log mining functionality. * This is the main class for rumen log mining functionality.
@ -126,7 +121,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
*/ */
private boolean omitTaskDetails = false; private boolean omitTaskDetails = false;
private JsonGenerator jobTraceGen = null; private Outputter<LoggedJob> jobTraceGen = null;
private boolean prettyprintTrace = true; private boolean prettyprintTrace = true;
@ -148,7 +143,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
private int[] attemptTimesPercentiles; private int[] attemptTimesPercentiles;
private JsonGenerator topologyGen = null; private Outputter<LoggedNetworkTopology> topologyGen = null;
private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>(); private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>();
@ -502,28 +497,12 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
} }
if (jobTraceFilename != null) { if (jobTraceFilename != null) {
ObjectMapper jmapper = new ObjectMapper(); jobTraceGen = new DefaultOutputter<LoggedJob>();
jmapper.configure( jobTraceGen.init(jobTraceFilename, getConf());
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory jfactory = jmapper.getJsonFactory();
FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
jobTraceGen =
jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
JsonEncoding.UTF8);
if (prettyprintTrace) {
jobTraceGen.useDefaultPrettyPrinter();
}
if (topologyFilename != null) { if (topologyFilename != null) {
ObjectMapper tmapper = new ObjectMapper(); topologyGen = new DefaultOutputter<LoggedNetworkTopology>();
tmapper.configure( topologyGen.init(topologyFilename, getConf());
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory tfactory = tmapper.getJsonFactory();
FileSystem topoFS = topologyFilename.getFileSystem(getConf());
topologyGen =
tfactory.createJsonGenerator(topoFS.create(topologyFilename),
JsonEncoding.UTF8);
topologyGen.useDefaultPrettyPrinter();
} }
} }
@ -795,8 +774,8 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
*/ */
if (jobID != null if (jobID != null
&& jobTraceGen != null && jobTraceGen != null
&& (jobBeingTraced == null || !jobID.equals(jobBeingTraced && (jobBeingTraced == null
.getJobID()))) { || !jobID.equals(jobBeingTraced.getJobID().toString()))) {
// push out the old job if there is one, even though it did't get // push out the old job if there is one, even though it did't get
// mated // mated
// with a conf. // with a conf.
@ -1621,7 +1600,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
private void maybeMateJobAndConf() throws IOException { private void maybeMateJobAndConf() throws IOException {
if (jobBeingTraced != null && jobconf != null if (jobBeingTraced != null && jobconf != null
&& jobBeingTraced.getJobID().equals(jobconf.jobID)) { && jobBeingTraced.getJobID().toString().equals(jobconf.jobID)) {
jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes); jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes);
jobBeingTraced.setQueue(jobconf.queue); jobBeingTraced.setQueue(jobconf.queue);
@ -1698,9 +1677,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
jobBeingTraced.setMapperTriesToSucceed(null); jobBeingTraced.setMapperTriesToSucceed(null);
} }
jobTraceGen.writeObject(jobBeingTraced); jobTraceGen.output(jobBeingTraced);
jobTraceGen.writeRaw("\n");
jobBeingTraced = null; jobBeingTraced = null;
} }
@ -1798,7 +1775,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
if (topologyGen != null) { if (topologyGen != null) {
LoggedNetworkTopology topo = LoggedNetworkTopology topo =
new LoggedNetworkTopology(allHosts, "<root>", 0); new LoggedNetworkTopology(allHosts, "<root>", 0);
topologyGen.writeObject(topo); topologyGen.output(topo);
topologyGen.close(); topologyGen.close();
} }

View File

@ -27,6 +27,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -83,11 +85,6 @@ public class JobBuilder {
* The number of splits a task can have, before we ignore them all. * The number of splits a task can have, before we ignore them all.
*/ */
private final static int MAXIMUM_PREFERRED_LOCATIONS = 25; private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
/**
* The regular expression used to parse task attempt IDs in job tracker logs
*/
private final static Pattern taskAttemptIDPattern =
Pattern.compile(".*_([0-9]+)");
private int[] attemptTimesPercentiles = null; private int[] attemptTimesPercentiles = null;
@ -262,7 +259,9 @@ public class JobBuilder {
finalized = true; finalized = true;
// set the conf // set the conf
result.setJobProperties(jobConfigurationParameters); if (jobConfigurationParameters != null) {
result.setJobProperties(jobConfigurationParameters);
}
// initialize all the per-job statistics gathering places // initialize all the per-job statistics gathering places
Histogram[] successfulMapAttemptTimes = Histogram[] successfulMapAttemptTimes =
@ -314,20 +313,10 @@ public class JobBuilder {
} }
} }
String attemptID = attempt.getAttemptID(); TaskAttemptID attemptID = attempt.getAttemptID();
if (attemptID != null) { if (attemptID != null) {
Matcher matcher = taskAttemptIDPattern.matcher(attemptID); successfulNthMapperAttempts.enter(attemptID.getId());
if (matcher.matches()) {
String attemptNumberString = matcher.group(1);
if (attemptNumberString != null) {
int attemptNumber = Integer.parseInt(attemptNumberString);
successfulNthMapperAttempts.enter(attemptNumber);
}
}
} }
} else { } else {
if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) { if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {

View File

@ -21,10 +21,16 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.apache.hadoop.tools.rumen.serializers.DefaultRumenSerializer;
import org.apache.hadoop.tools.rumen.serializers.ObjectStringSerializer;
import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig; import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.module.SimpleModule;
/** /**
* Simple wrapper around {@link JsonGenerator} to write objects in JSON format. * Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
@ -37,6 +43,19 @@ public class JsonObjectMapperWriter<T> implements Closeable {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.configure( mapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
// define a module
SimpleModule module = new SimpleModule("Default Serializer",
new Version(0, 1, 1, "FINAL"));
// add various serializers to the module
// add default (all-pass) serializer for all rumen specific data types
module.addSerializer(DataType.class, new DefaultRumenSerializer());
// add a serializer to use object.toString() while serializing
module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
// register the module with the object-mapper
mapper.registerModule(module);
mapper.getJsonFactory(); mapper.getJsonFactory();
writer = mapper.getJsonFactory().createJsonGenerator( writer = mapper.getJsonFactory().createJsonGenerator(
output, JsonEncoding.UTF8); output, JsonEncoding.UTF8);

View File

@ -27,6 +27,8 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.codehaus.jackson.annotate.JsonAnySetter; import org.codehaus.jackson.annotate.JsonAnySetter;
/** /**
@ -50,8 +52,8 @@ public class LoggedJob implements DeepCompare {
static private Set<String> alreadySeenAnySetterAttributes = static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>(); new TreeSet<String>();
String jobID; JobID jobID;
String user; UserName user;
long computonsPerMapInputByte = -1L; long computonsPerMapInputByte = -1L;
long computonsPerMapOutputByte = -1L; long computonsPerMapOutputByte = -1L;
long computonsPerReduceInputByte = -1L; long computonsPerReduceInputByte = -1L;
@ -80,9 +82,9 @@ public class LoggedJob implements DeepCompare {
LoggedDiscreteCDF successfulReduceAttemptCDF; LoggedDiscreteCDF successfulReduceAttemptCDF;
LoggedDiscreteCDF failedReduceAttemptCDF; LoggedDiscreteCDF failedReduceAttemptCDF;
String queue = null; QueueName queue = null;
String jobName = null; JobName jobName = null;
int clusterMapMB = -1; int clusterMapMB = -1;
int clusterReduceMB = -1; int clusterReduceMB = -1;
@ -94,7 +96,7 @@ public class LoggedJob implements DeepCompare {
double[] mapperTriesToSucceed; double[] mapperTriesToSucceed;
double failedMapperFraction; // !!!!! double failedMapperFraction; // !!!!!
private Properties jobProperties = new Properties(); private JobProperties jobProperties = new JobProperties();
LoggedJob() { LoggedJob() {
@ -110,13 +112,13 @@ public class LoggedJob implements DeepCompare {
* Set the configuration properties of the job. * Set the configuration properties of the job.
*/ */
void setJobProperties(Properties conf) { void setJobProperties(Properties conf) {
this.jobProperties = conf; this.jobProperties = new JobProperties(conf);
} }
/** /**
* Get the configuration properties of the job. * Get the configuration properties of the job.
*/ */
public Properties getJobProperties() { public JobProperties getJobProperties() {
return jobProperties; return jobProperties;
} }
@ -138,7 +140,6 @@ public class LoggedJob implements DeepCompare {
} }
} }
@SuppressWarnings("unused")
// for input parameter ignored. // for input parameter ignored.
@JsonAnySetter @JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) { public void setUnknownAttribute(String attributeName, Object ignored) {
@ -149,20 +150,20 @@ public class LoggedJob implements DeepCompare {
} }
} }
public String getUser() { public UserName getUser() {
return user; return user;
} }
void setUser(String user) { void setUser(String user) {
this.user = user; this.user = new UserName(user);
} }
public String getJobID() { public JobID getJobID() {
return jobID; return jobID;
} }
void setJobID(String jobID) { void setJobID(String jobID) {
this.jobID = jobID; this.jobID = JobID.forName(jobID);
} }
public JobPriority getPriority() { public JobPriority getPriority() {
@ -359,20 +360,20 @@ public class LoggedJob implements DeepCompare {
this.relativeTime = relativeTime; this.relativeTime = relativeTime;
} }
public String getQueue() { public QueueName getQueue() {
return queue; return queue;
} }
void setQueue(String queue) { void setQueue(String queue) {
this.queue = queue; this.queue = new QueueName(queue);
} }
public String getJobName() { public JobName getJobName() {
return jobName; return jobName;
} }
void setJobName(String jobName) { void setJobName(String jobName) {
this.jobName = jobName; this.jobName = new JobName(jobName);
} }
public int getClusterMapMB() { public int getClusterMapMB() {
@ -555,35 +556,54 @@ public class LoggedJob implements DeepCompare {
} }
} }
private void compareJobProperties(Properties prop1, Properties prop2, private void compareJobProperties(JobProperties jprop1, JobProperties jprop2,
TreePath loc, String eltname) TreePath loc, String eltname)
throws DeepInequalityException { throws DeepInequalityException {
if (prop1 == null && prop2 == null) { if (jprop1 == null && jprop2 == null) {
return; return;
} }
if (prop1 == null || prop2 == null) { if (jprop1 == null || jprop2 == null) {
throw new DeepInequalityException(eltname + " miscompared [null]", throw new DeepInequalityException(eltname + " miscompared",
new TreePath(loc, eltname)); new TreePath(loc, eltname));
} }
Properties prop1 = jprop1.getValue();
Properties prop2 = jprop2.getValue();
if (prop1.size() != prop2.size()) { if (prop1.size() != prop2.size()) {
throw new DeepInequalityException(eltname + " miscompared [size]", throw new DeepInequalityException(eltname + " miscompared [size]",
new TreePath(loc, eltname)); new TreePath(loc, eltname));
} }
for (Map.Entry<Object, Object> entry : prop1.entrySet()) { for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
Object v1 = entry.getValue(); String v1 = entry.getValue().toString();
Object v2 = prop2.get(entry.getKey()); String v2 = prop2.get(entry.getKey()).toString();
if (v1 == null || v2 == null || !v1.equals(v2)) { compare1(v1, v2, new TreePath(loc, eltname), "key:" + entry.getKey());
throw new DeepInequalityException(
eltname + " miscompared for value of key : "
+ entry.getKey().toString(),
new TreePath(loc, eltname));
}
} }
} }
private void compare1(DataType<String> c1, DataType<String> c2, TreePath loc,
String eltname)
throws DeepInequalityException {
if (c1 == null && c2 == null) {
return;
}
if (c1 == null || c2 == null) {
throw new DeepInequalityException(eltname + " miscompared",
new TreePath(loc, eltname));
}
TreePath dtPath = new TreePath(loc, eltname);
if (!c1.getClass().getName().equals(c2.getClass().getName())) {
throw new DeepInequalityException(eltname + " miscompared",
new TreePath(dtPath, "class"));
}
compare1(c1.getValue(), c2.getValue(), dtPath, "value");
}
public void deepCompare(DeepCompare comparand, TreePath loc) public void deepCompare(DeepCompare comparand, TreePath loc)
throws DeepInequalityException { throws DeepInequalityException {
if (!(comparand instanceof LoggedJob)) { if (!(comparand instanceof LoggedJob)) {
@ -592,7 +612,7 @@ public class LoggedJob implements DeepCompare {
LoggedJob other = (LoggedJob) comparand; LoggedJob other = (LoggedJob) comparand;
compare1(jobID, other.jobID, loc, "jobID"); compare1(jobID.toString(), other.jobID.toString(), loc, "jobID");
compare1(user, other.user, loc, "user"); compare1(user, other.user, loc, "user");
compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc, compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter; import org.codehaus.jackson.annotate.JsonAnySetter;
/** /**
@ -44,20 +45,20 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
* *
*/ */
public class LoggedLocation implements DeepCompare { public class LoggedLocation implements DeepCompare {
static final Map<List<String>, List<String>> layersCache = static final Map<List<String>, List<NodeName>> layersCache =
new HashMap<List<String>, List<String>>(); new HashMap<List<String>, List<NodeName>>();
/** /**
* The full path from the root of the network to the host. * The full path from the root of the network to the host.
* *
* NOTE that this assumes that the network topology is a tree. * NOTE that this assumes that the network topology is a tree.
*/ */
List<String> layers = Collections.emptyList(); List<NodeName> layers = Collections.emptyList();
static private Set<String> alreadySeenAnySetterAttributes = static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>(); new TreeSet<String>();
public List<String> getLayers() { public List<NodeName> getLayers() {
return layers; return layers;
} }
@ -66,16 +67,17 @@ public class LoggedLocation implements DeepCompare {
this.layers = Collections.emptyList(); this.layers = Collections.emptyList();
} else { } else {
synchronized (layersCache) { synchronized (layersCache) {
List<String> found = layersCache.get(layers); List<NodeName> found = layersCache.get(layers);
if (found == null) { if (found == null) {
// make a copy with interned string. // make a copy with interned string.
List<String> clone = new ArrayList<String>(layers.size()); List<NodeName> clone = new ArrayList<NodeName>(layers.size());
for (String s : layers) { clone.add(new NodeName(layers.get(0).intern(), null));
clone.add(s.intern()); clone.add(new NodeName(null, layers.get(1).intern()));
}
// making it read-only as we are sharing them. // making it read-only as we are sharing them.
List<String> readonlyLayers = Collections.unmodifiableList(clone); List<NodeName> readonlyLayers = Collections.unmodifiableList(clone);
layersCache.put(readonlyLayers, readonlyLayers); List<String> readonlyLayersKey = Collections.unmodifiableList(layers);
layersCache.put(readonlyLayersKey, readonlyLayers);
this.layers = readonlyLayers; this.layers = readonlyLayers;
} else { } else {
this.layers = found; this.layers = found;
@ -84,7 +86,6 @@ public class LoggedLocation implements DeepCompare {
} }
} }
@SuppressWarnings("unused")
// for input parameter ignored. // for input parameter ignored.
@JsonAnySetter @JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) { public void setUnknownAttribute(String attributeName, Object ignored) {
@ -96,17 +97,33 @@ public class LoggedLocation implements DeepCompare {
} }
// I'll treat this as an atomic object type // I'll treat this as an atomic object type
private void compareStrings(List<String> c1, List<String> c2, TreePath loc, private void compareStrings(List<NodeName> c1, List<NodeName> c2,
String eltname) throws DeepInequalityException { TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 == null && c2 == null) { if (c1 == null && c2 == null) {
return; return;
} }
TreePath recursePath = new TreePath(loc, eltname); TreePath recursePath = new TreePath(loc, eltname);
if (c1 == null || c2 == null || !c1.equals(c2)) { if (c1 == null || c2 == null || (c1.size() != c2.size())) {
throw new DeepInequalityException(eltname + " miscompared", recursePath); throw new DeepInequalityException(eltname + " miscompared", recursePath);
} }
for (NodeName n1 : c1) {
boolean found = false;
for (NodeName n2 : c2) {
if (n1.getValue().equals(n2.getValue())) {
found = true;
break;
}
}
if (!found) {
throw new DeepInequalityException(eltname
+ " miscompared [" + n1.getValue() +"]", recursePath);
}
}
} }
public void deepCompare(DeepCompare comparand, TreePath loc) public void deepCompare(DeepCompare comparand, TreePath loc)

View File

@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
import org.codehaus.jackson.annotate.JsonAnySetter; import org.codehaus.jackson.annotate.JsonAnySetter;
/** /**
@ -40,7 +41,7 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
* *
*/ */
public class LoggedNetworkTopology implements DeepCompare { public class LoggedNetworkTopology implements DeepCompare {
String name; NodeName name;
List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>(); List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
static private Set<String> alreadySeenAnySetterAttributes = static private Set<String> alreadySeenAnySetterAttributes =
@ -50,7 +51,6 @@ public class LoggedNetworkTopology implements DeepCompare {
super(); super();
} }
@SuppressWarnings("unused")
// for input parameter ignored. // for input parameter ignored.
@JsonAnySetter @JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) { public void setUnknownAttribute(String attributeName, Object ignored) {
@ -70,7 +70,7 @@ public class LoggedNetworkTopology implements DeepCompare {
*/ */
static class TopoSort implements Comparator<LoggedNetworkTopology> { static class TopoSort implements Comparator<LoggedNetworkTopology> {
public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) { public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
return t1.name.compareTo(t2.name); return t1.name.getValue().compareTo(t2.name.getValue());
} }
} }
@ -83,8 +83,11 @@ public class LoggedNetworkTopology implements DeepCompare {
* the level number * the level number
*/ */
LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) { LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
if (name == null) {
this.name = name; this.name = NodeName.ROOT;
} else {
this.name = new NodeName(name);
}
this.children = null; this.children = null;
if (level < ParsedHost.numberOfDistances() - 1) { if (level < ParsedHost.numberOfDistances() - 1) {
@ -120,15 +123,15 @@ public class LoggedNetworkTopology implements DeepCompare {
} }
LoggedNetworkTopology(Set<ParsedHost> hosts) { LoggedNetworkTopology(Set<ParsedHost> hosts) {
this(hosts, "<root>", 0); this(hosts, null, 0);
} }
public String getName() { public NodeName getName() {
return name; return name;
} }
void setName(String name) { void setName(String name) {
this.name = name; this.name = new NodeName(name);
} }
public List<LoggedNetworkTopology> getChildren() { public List<LoggedNetworkTopology> getChildren() {

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.mapreduce.jobhistory.Events; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter; import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters; import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
@ -44,7 +44,7 @@ public class LoggedTask implements DeepCompare {
long inputRecords = -1L; long inputRecords = -1L;
long outputBytes = -1L; long outputBytes = -1L;
long outputRecords = -1L; long outputRecords = -1L;
String taskID; TaskID taskID;
long startTime = -1L; long startTime = -1L;
long finishTime = -1L; long finishTime = -1L;
Pre21JobHistoryConstants.Values taskType; Pre21JobHistoryConstants.Values taskType;
@ -55,7 +55,6 @@ public class LoggedTask implements DeepCompare {
static private Set<String> alreadySeenAnySetterAttributes = static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>(); new TreeSet<String>();
@SuppressWarnings("unused")
// for input parameter ignored. // for input parameter ignored.
@JsonAnySetter @JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) { public void setUnknownAttribute(String attributeName, Object ignored) {
@ -111,12 +110,12 @@ public class LoggedTask implements DeepCompare {
this.outputRecords = outputRecords; this.outputRecords = outputRecords;
} }
public String getTaskID() { public TaskID getTaskID() {
return taskID; return taskID;
} }
void setTaskID(String taskID) { void setTaskID(String taskID) {
this.taskID = taskID; this.taskID = TaskID.forName(taskID);
} }
public long getStartTime() { public long getStartTime() {
@ -357,7 +356,7 @@ public class LoggedTask implements DeepCompare {
compare1(outputBytes, other.outputBytes, loc, "outputBytes"); compare1(outputBytes, other.outputBytes, loc, "outputBytes");
compare1(outputRecords, other.outputRecords, loc, "outputRecords"); compare1(outputRecords, other.outputRecords, loc, "outputRecords");
compare1(taskID, other.taskID, loc, "taskID"); compare1(taskID.toString(), other.taskID.toString(), loc, "taskID");
compare1(startTime, other.startTime, loc, "startTime"); compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime"); compare1(finishTime, other.finishTime, loc, "finishTime");

View File

@ -30,9 +30,11 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
// the Jackson implementation of JSON doesn't handle a // the Jackson implementation of JSON doesn't handle a
// superclass-valued field. // superclass-valued field.
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter; import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters; import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
/** /**
* A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a * A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
@ -44,11 +46,11 @@ import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
*/ */
public class LoggedTaskAttempt implements DeepCompare { public class LoggedTaskAttempt implements DeepCompare {
String attemptID; TaskAttemptID attemptID;
Pre21JobHistoryConstants.Values result; Pre21JobHistoryConstants.Values result;
long startTime = -1L; long startTime = -1L;
long finishTime = -1L; long finishTime = -1L;
String hostName; NodeName hostName;
long hdfsBytesRead = -1L; long hdfsBytesRead = -1L;
long hdfsBytesWritten = -1L; long hdfsBytesWritten = -1L;
@ -188,7 +190,6 @@ public class LoggedTaskAttempt implements DeepCompare {
static private Set<String> alreadySeenAnySetterAttributes = static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>(); new TreeSet<String>();
@SuppressWarnings("unused")
// for input parameter ignored. // for input parameter ignored.
@JsonAnySetter @JsonAnySetter
public void setUnknownAttribute(String attributeName, Object ignored) { public void setUnknownAttribute(String attributeName, Object ignored) {
@ -292,12 +293,12 @@ public class LoggedTaskAttempt implements DeepCompare {
this.sortFinished = sortFinished; this.sortFinished = sortFinished;
} }
public String getAttemptID() { public TaskAttemptID getAttemptID() {
return attemptID; return attemptID;
} }
void setAttemptID(String attemptID) { void setAttemptID(String attemptID) {
this.attemptID = attemptID; this.attemptID = TaskAttemptID.forName(attemptID);
} }
public Pre21JobHistoryConstants.Values getResult() { public Pre21JobHistoryConstants.Values getResult() {
@ -324,15 +325,17 @@ public class LoggedTaskAttempt implements DeepCompare {
this.finishTime = finishTime; this.finishTime = finishTime;
} }
public String getHostName() { public NodeName getHostName() {
return hostName; return hostName;
} }
// This is needed for JSON deserialization
void setHostName(String hostName) { void setHostName(String hostName) {
this.hostName = hostName; this.hostName = hostName == null ? null : new NodeName(hostName);
} }
// hostName is saved in the format rackName/NodeName // In job-history, hostName is saved in the format rackName/NodeName
//TODO this is a hack! The '/' handling needs fixing.
void setHostName(String hostName, String rackName) { void setHostName(String hostName, String rackName) {
if (hostName == null || hostName.length() == 0) { if (hostName == null || hostName.length() == 0) {
throw new RuntimeException("Invalid entry! Missing hostname"); throw new RuntimeException("Invalid entry! Missing hostname");
@ -649,6 +652,20 @@ public class LoggedTaskAttempt implements DeepCompare {
} }
} }
private void compare1(NodeName c1, NodeName c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 == null && c2 == null) {
return;
}
if (c1 == null || c2 == null) {
throw new DeepInequalityException(eltname + " miscompared", new TreePath(
loc, eltname));
}
compare1(c1.getValue(), c2.getValue(), new TreePath(loc, eltname), "value");
}
private void compare1(long c1, long c2, TreePath loc, String eltname) private void compare1(long c1, long c2, TreePath loc, String eltname)
throws DeepInequalityException { throws DeepInequalityException {
if (c1 != c2) { if (c1 != c2) {
@ -709,7 +726,7 @@ public class LoggedTaskAttempt implements DeepCompare {
LoggedTaskAttempt other = (LoggedTaskAttempt) comparand; LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
compare1(attemptID, other.attemptID, loc, "attemptID"); compare1(attemptID.toString(), other.attemptID.toString(), loc, "attemptID");
compare1(result, other.result, loc, "result"); compare1(result, other.result, loc, "result");
compare1(startTime, other.startTime, loc, "startTime"); compare1(startTime, other.startTime, loc, "startTime");
compare1(finishTime, other.finishTime, loc, "finishTime"); compare1(finishTime, other.finishTime, loc, "finishTime");

View File

@ -22,7 +22,9 @@ import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.regex.Matcher; import java.util.regex.Matcher;
class ParsedHost { import org.apache.hadoop.tools.rumen.datatypes.NodeName;
public class ParsedHost {
private final String rackName; private final String rackName;
private final String nodeName; private final String nodeName;
@ -76,10 +78,10 @@ class ParsedHost {
} }
public ParsedHost(LoggedLocation loc) { public ParsedHost(LoggedLocation loc) {
List<String> coordinates = loc.getLayers(); List<NodeName> coordinates = loc.getLayers();
rackName = process(coordinates.get(0)); rackName = coordinates.get(0).getRackName();
nodeName = process(coordinates.get(1)); nodeName = coordinates.get(1).getHostName();
} }
LoggedLocation makeLoggedLocation() { LoggedLocation makeLoggedLocation() {
@ -95,11 +97,11 @@ class ParsedHost {
return result; return result;
} }
String getNodeName() { public String getNodeName() {
return nodeName; return nodeName;
} }
String getRackName() { public String getRackName() {
return rackName; return rackName;
} }

View File

@ -124,15 +124,16 @@ public class ZombieCluster extends AbstractClusterStory {
int level = levelMapping.get(n); int level = levelMapping.get(n);
Node current; Node current;
if (level == leafLevel) { // a machine node if (level == leafLevel) { // a machine node
MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level); MachineNode.Builder builder =
new MachineNode.Builder(n.getName().getValue(), level);
if (defaultNode != null) { if (defaultNode != null) {
builder.cloneFrom(defaultNode); builder.cloneFrom(defaultNode);
} }
current = builder.build(); current = builder.build();
} else { } else {
current = (level == leafLevel - 1) current = (level == leafLevel - 1)
? new RackNode(n.getName(), level) : ? new RackNode(n.getName().getValue(), level) :
new Node(n.getName(), level); new Node(n.getName().getValue(), level);
path[level] = current; path[level] = current;
// Add all children to the front of the queue. // Add all children to the front of the queue.
for (LoggedNetworkTopology child : n.getChildren()) { for (LoggedNetworkTopology child : n.getChildren()) {

View File

@ -28,12 +28,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State; import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
/** /**
@ -128,7 +130,7 @@ public class ZombieJob implements JobStory {
// file, are added first because the specialized values obtained from // file, are added first because the specialized values obtained from
// Rumen should override the job conf values. // Rumen should override the job conf values.
// //
for (Map.Entry<Object, Object> entry : job.getJobProperties().entrySet()) { for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
jobConf.set(entry.getKey().toString(), entry.getValue().toString()); jobConf.set(entry.getKey().toString(), entry.getValue().toString());
} }
@ -161,12 +163,12 @@ public class ZombieJob implements JobStory {
List<String> hostList = new ArrayList<String>(); List<String> hostList = new ArrayList<String>();
if (locations != null) { if (locations != null) {
for (LoggedLocation location : locations) { for (LoggedLocation location : locations) {
List<String> layers = location.getLayers(); List<NodeName> layers = location.getLayers();
if (layers.size() == 0) { if (layers.size() == 0) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID()); LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
continue; continue;
} }
String host = layers.get(layers.size() - 1); String host = layers.get(layers.size() - 1).getValue();
if (host == null) { if (host == null) {
LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers); LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
continue; continue;
@ -226,20 +228,20 @@ public class ZombieJob implements JobStory {
@Override @Override
public String getName() { public String getName() {
String jobName = job.getJobName(); JobName jobName = job.getJobName();
if (jobName == null) { if (jobName == null) {
return "(name unknown)"; return "(name unknown)";
} else { } else {
return jobName; return jobName.getValue();
} }
} }
@Override @Override
public JobID getJobID() { public JobID getJobID() {
return JobID.forName(getLoggedJob().getJobID()); return getLoggedJob().getJobID();
} }
private int sanitizeValue(int oldVal, int defaultVal, String name, String id) { private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
if (oldVal == -1) { if (oldVal == -1) {
LOG.warn(name +" not defined for "+id); LOG.warn(name +" not defined for "+id);
return defaultVal; return defaultVal;
@ -269,8 +271,10 @@ public class ZombieJob implements JobStory {
@Override @Override
public String getQueueName() { public String getQueueName() {
String queue = job.getQueue(); QueueName queue = job.getQueue();
return (queue == null)? JobConf.DEFAULT_QUEUE_NAME : queue; return (queue == null || queue.getValue() == null)
? JobConf.DEFAULT_QUEUE_NAME
: queue.getValue();
} }
/** /**
@ -357,13 +361,12 @@ public class ZombieJob implements JobStory {
for (LoggedTask map : job.getMapTasks()) { for (LoggedTask map : job.getMapTasks()) {
map = sanitizeLoggedTask(map); map = sanitizeLoggedTask(map);
if (map != null) { if (map != null) {
loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map); loggedTaskMap.put(maskTaskID(map.taskID), map);
for (LoggedTaskAttempt mapAttempt : map.getAttempts()) { for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt); mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
if (mapAttempt != null) { if (mapAttempt != null) {
TaskAttemptID id = TaskAttemptID.forName(mapAttempt TaskAttemptID id = mapAttempt.getAttemptID();
.getAttemptID());
loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt); loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
} }
} }
@ -372,13 +375,12 @@ public class ZombieJob implements JobStory {
for (LoggedTask reduce : job.getReduceTasks()) { for (LoggedTask reduce : job.getReduceTasks()) {
reduce = sanitizeLoggedTask(reduce); reduce = sanitizeLoggedTask(reduce);
if (reduce != null) { if (reduce != null) {
loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce); loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);
for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) { for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt); reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
if (reduceAttempt != null) { if (reduceAttempt != null) {
TaskAttemptID id = TaskAttemptID.forName(reduceAttempt TaskAttemptID id = reduceAttempt.getAttemptID();
.getAttemptID());
loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt); loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
} }
} }
@ -391,8 +393,10 @@ public class ZombieJob implements JobStory {
@Override @Override
public String getUser() { public String getUser() {
String retval = job.getUser(); UserName retval = job.getUser();
return (retval==null)?"(unknown)":retval; return (retval == null || retval.getValue() == null)
? "(unknown)"
: retval.getValue();
} }
/** /**
@ -511,7 +515,7 @@ public class ZombieJob implements JobStory {
} }
} }
private long sanitizeTaskRuntime(long time, String id) { private long sanitizeTaskRuntime(long time, ID id) {
if (time < 0) { if (time < 0) {
LOG.warn("Negative running time for task "+id+": "+time); LOG.warn("Negative running time for task "+id+": "+time);
return 100L; // set default to 100ms. return 100L; // set default to 100ms.
@ -547,7 +551,7 @@ public class ZombieJob implements JobStory {
private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) { private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
int distance = cluster.getMaximumDistance(); int distance = cluster.getMaximumDistance();
String rackHostName = loggedAttempt.getHostName(); String rackHostName = loggedAttempt.getHostName().getValue();
if (rackHostName == null) { if (rackHostName == null) {
return distance; return distance;
} }
@ -558,11 +562,11 @@ public class ZombieJob implements JobStory {
List<LoggedLocation> locations = loggedTask.getPreferredLocations(); List<LoggedLocation> locations = loggedTask.getPreferredLocations();
if (locations != null) { if (locations != null) {
for (LoggedLocation location : locations) { for (LoggedLocation location : locations) {
List<String> layers = location.getLayers(); List<NodeName> layers = location.getLayers();
if ((layers == null) || (layers.isEmpty())) { if ((layers == null) || (layers.isEmpty())) {
continue; continue;
} }
String dataNodeName = layers.get(layers.size()-1); String dataNodeName = layers.get(layers.size()-1).getValue();
MachineNode dataNode = cluster.getMachineByName(dataNodeName); MachineNode dataNode = cluster.getMachineByName(dataNodeName);
if (dataNode != null) { if (dataNode != null) {
distance = Math.min(distance, cluster.distance(mn, dataNode)); distance = Math.min(distance, cluster.distance(mn, dataNode));
@ -690,8 +694,8 @@ public class ZombieJob implements JobStory {
private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber, private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
int taskAttemptNumber) { int taskAttemptNumber) {
return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()), return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber),
taskType, taskNumber), taskAttemptNumber); taskAttemptNumber);
} }
private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo, private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
@ -704,7 +708,7 @@ public class ZombieJob implements JobStory {
state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed()); state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
runtime = makeUpMapRuntime(state, locality); runtime = makeUpMapRuntime(state, locality);
runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType, runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
taskNumber, taskAttemptNumber).toString()); taskNumber, taskAttemptNumber));
TaskAttemptInfo tai TaskAttemptInfo tai
= new MapTaskAttemptInfo(state, taskInfo, runtime, null); = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
return tai; return tai;

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.anonymization;
import org.apache.hadoop.tools.rumen.state.State;
/**
* The data anonymizer interface.
*/
public interface DataAnonymizer<T> {
T anonymize(T data, State state);
}

View File

@ -0,0 +1,106 @@
package org.apache.hadoop.tools.rumen.anonymization;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.tools.rumen.state.State;
/**
* Represents the list of words used in list-backed anonymizers.
*/
public class WordList implements State {
private Map<String, Integer> list = new HashMap<String, Integer>(0);
private boolean isUpdated = false;
private String name;
public WordList() {
this("word");
}
public WordList(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
/**
* Adds the specified word to the list if the word is not already added.
*/
public void add(String word) {
if (!contains(word)) {
int index = getSize();
list.put(word, index);
isUpdated = true;
}
}
/**
* Returns 'true' if the list contains the specified word.
*/
public boolean contains(String word) {
return list.containsKey(word);
}
/**
* Returns the index of the specified word in the list.
*/
public int indexOf(String word) {
return list.get(word);
}
/**
* Returns the size of the list.
*/
public int getSize() {
return list.size();
}
/**
* Returns 'true' if the list is updated since creation (and reload).
*/
@Override
public boolean isUpdated() {
return isUpdated;
}
/**
* Setters and getters for Jackson JSON
*/
/**
* Sets the size of the list.
*
* Note: That this API is only for Jackson JSON deserialization.
*/
public void setSize(int size) {
list = new HashMap<String, Integer>(size);
}
/**
* Note: That this API is only for Jackson JSON deserialization.
*/
@Override
public void setName(String name) {
this.name = name;
}
/**
* Gets the words.
*
* Note: That this API is only for Jackson JSON serialization.
*/
public Map<String, Integer> getWords() {
return list;
}
/**
* Sets the words.
*
* Note: That this API is only for Jackson JSON deserialization.
*/
public void setWords(Map<String, Integer> list) {
this.list = list;
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.anonymization;
import org.apache.commons.lang.StringUtils;
/**
* Utility class to handle commonly performed tasks in a
* {@link org.apache.hadoop.tools.rumen.datatypes.DefaultAnonymizableDataType}
* using a {@link WordList} for anonymization.
* //TODO There is no caching for saving memory.
*/
public class WordListAnonymizerUtility {
public static final String[] KNOWN_WORDS =
new String[] {"job", "tmp", "temp", "home", "homes", "usr", "user", "test"};
/**
* Checks if the data needs anonymization. Typically, data types which are
* numeric in nature doesn't need anonymization.
*/
public static boolean needsAnonymization(String data) {
// Numeric data doesn't need anonymization
// Currently this doesnt support inputs like
// - 12.3
// - 12.3f
// - 90L
// - 1D
if (StringUtils.isNumeric(data)) {
return false;
}
return true; // by default return true
}
/**
* Checks if the given data has a known suffix.
*/
public static boolean hasSuffix(String data, String[] suffixes) {
// check if they end in known suffixes
for (String ks : suffixes) {
if (data.endsWith(ks)) {
return true;
}
}
return false;
}
/**
* Extracts a known suffix from the given data.
*
* @throws RuntimeException if the data doesn't have a suffix.
* Use {@link #hasSuffix(String, String[])} to make sure that the
* given data has a suffix.
*/
public static String[] extractSuffix(String data, String[] suffixes) {
// check if they end in known suffixes
String suffix = "";
for (String ks : suffixes) {
if (data.endsWith(ks)) {
suffix = ks;
// stripe off the suffix which will get appended later
data = data.substring(0, data.length() - suffix.length());
return new String[] {data, suffix};
}
}
// throw exception
throw new RuntimeException("Data [" + data + "] doesn't have a suffix from"
+ " known suffixes [" + StringUtils.join(suffixes, ',') + "]");
}
/**
* Checks if the given data is known. This API uses {@link #KNOWN_WORDS} to
* detect if the given data is a commonly used (so called 'known') word.
*/
public static boolean isKnownData(String data) {
return isKnownData(data, KNOWN_WORDS);
}
/**
* Checks if the given data is known.
*/
public static boolean isKnownData(String data, String[] knownWords) {
// check if the data is known content
//TODO [Chunking] Do this for sub-strings of data
for (String kd : knownWords) {
if (data.equals(kd)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.state.StatePool;
/**
* An interface for data-types that can be anonymized.
*/
public interface AnonymizableDataType<T> extends DataType<T> {
public T getAnonymizedValue(StatePool statePool, Configuration conf);
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import org.apache.hadoop.conf.Configuration;
/**
* Represents a class name.
*/
public class ClassName extends DefaultAnonymizableDataType {
public static final String CLASSNAME_PRESERVE_CONFIG = "rumen.data-types.classname.preserve";
private final String className;
public ClassName(String className) {
super();
this.className = className;
}
@Override
public String getValue() {
return className;
}
@Override
protected String getPrefix() {
return "class";
}
@Override
protected boolean needsAnonymization(Configuration conf) {
String[] preserves = conf.getStrings(CLASSNAME_PRESERVE_CONFIG);
if (preserves != null) {
// do a simple starts with check
for (String p : preserves) {
if (className.startsWith(p)) {
return false;
}
}
}
return true;
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
/**
* Represents a Rumen data-type.
*/
public interface DataType<T> {
T getValue();
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.anonymization.WordList;
import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
import org.apache.hadoop.tools.rumen.state.StatePool;
/**
* Represents a default anonymizable Rumen data-type. It uses
* {@link WordListAnonymizerUtility} for anonymization.
*/
public abstract class DefaultAnonymizableDataType
implements AnonymizableDataType<String> {
private static final String DEFAULT_PREFIX = "data";
protected String getPrefix() {
return DEFAULT_PREFIX;
}
// Determines if the contained data needs anonymization
protected boolean needsAnonymization(Configuration conf) {
return true;
}
@Override
public final String getAnonymizedValue(StatePool statePool,
Configuration conf) {
if (needsAnonymization(conf)) {
WordList state = (WordList) statePool.getState(getClass());
if (state == null) {
state = new WordList(getPrefix());
statePool.addState(getClass(), state);
}
return anonymize(getValue(), state);
} else {
return getValue();
}
}
private static String anonymize(String data, WordList wordList) {
if (data == null) {
return null;
}
if (!wordList.contains(data)) {
wordList.add(data);
}
return wordList.getName() + wordList.indexOf(data);
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
/**
* This represents the default java data-types (like int, long, float etc).
*/
public class DefaultDataType implements DataType<String> {
private String value;
public DefaultDataType(String value) {
this.value = value;
}
/**
* Get the value of the attribute.
*/
@Override
public String getValue() {
return value;
}
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.anonymization.WordList;
import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
import org.apache.hadoop.tools.rumen.state.State;
import org.apache.hadoop.tools.rumen.state.StatePool;
import org.apache.hadoop.util.StringUtils;
/**
* Represents a file's location.
*
* Currently, only filenames that can be represented using {@link Path} are
* supported.
*/
public class FileName implements AnonymizableDataType<String> {
private final String fileName;
private String anonymizedFileName;
private static final String PREV_DIR = "..";
private static final String[] KNOWN_SUFFIXES =
new String[] {".xml", ".jar", ".txt", ".tar", ".zip", ".json", ".gzip",
".lzo"};
/**
* A composite state for filename.
*/
public static class FileNameState implements State {
private WordList dirState = new WordList("dir");
private WordList fileNameState = new WordList("file");
@Override
public boolean isUpdated() {
return dirState.isUpdated() || fileNameState.isUpdated();
}
public WordList getDirectoryState() {
return dirState;
}
public WordList getFileNameState() {
return fileNameState;
}
public void setDirectoryState(WordList state) {
this.dirState = state;
}
public void setFileNameState(WordList state) {
this.fileNameState = state;
}
@Override
public String getName() {
return "path";
}
@Override
public void setName(String name) {
// for now, simply assert since this class has a hardcoded name
if (!getName().equals(name)) {
throw new RuntimeException("State name mismatch! Expected '"
+ getName() + "' but found '" + name + "'.");
}
}
}
public FileName(String fileName) {
this.fileName = fileName;
}
@Override
public String getValue() {
return fileName;
}
@Override
public String getAnonymizedValue(StatePool statePool,
Configuration conf) {
if (anonymizedFileName == null) {
anonymize(statePool, conf);
}
return anonymizedFileName;
}
private void anonymize(StatePool statePool, Configuration conf) {
FileNameState fState = (FileNameState) statePool.getState(getClass());
if (fState == null) {
fState = new FileNameState();
statePool.addState(getClass(), fState);
}
String[] files = StringUtils.split(fileName);
String[] anonymizedFileNames = new String[files.length];
int i = 0;
for (String f : files) {
anonymizedFileNames[i++] =
anonymize(statePool, conf, fState, f);
}
anonymizedFileName = StringUtils.arrayToString(anonymizedFileNames);
}
private static String anonymize(StatePool statePool, Configuration conf,
FileNameState fState, String fileName) {
String ret = null;
try {
URI uri = new URI(fileName);
// anonymize the path i.e without the authority & scheme
ret =
anonymizePath(uri.getPath(), fState.getDirectoryState(),
fState.getFileNameState());
// anonymize the authority and scheme
String authority = uri.getAuthority();
String scheme = uri.getScheme();
if (scheme != null) {
String anonymizedAuthority = "";
if (authority != null) {
// anonymize the authority
NodeName hostName = new NodeName(null, uri.getHost());
anonymizedAuthority = hostName.getAnonymizedValue(statePool, conf);
}
ret = scheme + "://" + anonymizedAuthority + ret;
}
} catch (URISyntaxException use) {
throw new RuntimeException (use);
}
return ret;
}
// Anonymize the file-path
private static String anonymizePath(String path, WordList dState,
WordList fState) {
StringBuilder buffer = new StringBuilder();
StringTokenizer tokenizer = new StringTokenizer(path, Path.SEPARATOR, true);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
if (Path.SEPARATOR.equals(token)) {
buffer.append(token);
} else if (Path.CUR_DIR.equals(token)) {
buffer.append(token);
} else if (PREV_DIR.equals(token)) {
buffer.append(token);
} else if (tokenizer.hasMoreTokens()){
// this component is a directory
buffer.append(anonymize(token, dState));
} else {
// this component is a file
buffer.append(anonymize(token, fState));
}
}
return buffer.toString();
}
//TODO There is no caching for saving memory.
private static String anonymize(String data, WordList wordList) {
if (data == null) {
return null;
}
if (WordListAnonymizerUtility.needsAnonymization(data)) {
String suffix = "";
String coreData = data;
// check and extract suffix
if (WordListAnonymizerUtility.hasSuffix(data, KNOWN_SUFFIXES)) {
// check if the data ends with a known suffix
String[] split =
WordListAnonymizerUtility.extractSuffix(data, KNOWN_SUFFIXES);
suffix = split[1];
coreData = split[0];
}
// check if the data is known content
//TODO [Chunking] Do this for sub-strings of data
String anonymizedData = coreData;
if (!WordListAnonymizerUtility.isKnownData(coreData)) {
if (!wordList.contains(coreData)) {
wordList.add(coreData);
}
anonymizedData = wordList.getName() + wordList.indexOf(coreData);
}
return anonymizedData + suffix;
} else {
return data;
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
/**
* Represents a job's name.
*/
public class JobName extends DefaultAnonymizableDataType {
private final String jobName;
public JobName(String jobName) {
super();
this.jobName = jobName;
}
@Override
public String getValue() {
return jobName;
}
@Override
protected String getPrefix() {
return "job";
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.datatypes.util.JobPropertyParser;
import org.apache.hadoop.tools.rumen.datatypes.util.MapReduceJobPropertiesParser;
import org.apache.hadoop.tools.rumen.state.StatePool;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This represents the job configuration properties.
*/
public class JobProperties implements AnonymizableDataType<Properties> {
public static final String PARSERS_CONFIG_KEY =
"rumen.datatypes.jobproperties.parsers";
private final Properties jobProperties;
public JobProperties() {
this(new Properties());
}
public JobProperties(Properties properties) {
this.jobProperties = properties;
}
public Properties getValue() {
return jobProperties;
}
@Override
public Properties getAnonymizedValue(StatePool statePool,
Configuration conf) {
Properties filteredProperties = null;
List<JobPropertyParser> pList = new ArrayList<JobPropertyParser>(1);
// load the parsers
String config = conf.get(PARSERS_CONFIG_KEY);
if (config != null) {
@SuppressWarnings("unchecked")
Class<JobPropertyParser>[] parsers =
(Class[])conf.getClasses(PARSERS_CONFIG_KEY);
for (Class<JobPropertyParser> c : parsers) {
JobPropertyParser parser = ReflectionUtils.newInstance(c, conf);
pList.add(parser);
}
} else {
// add the default MapReduce filter
JobPropertyParser parser = new MapReduceJobPropertiesParser();
pList.add(parser);
}
// filter out the desired config key-value pairs
if (jobProperties != null) {
filteredProperties = new Properties();
// define a configuration object and load it with original job properties
for (Map.Entry<Object, Object> entry : jobProperties.entrySet()) {
//TODO Check for null key/value?
String key = entry.getKey().toString();
String value = entry.getValue().toString();
// find a parser for this key
for (JobPropertyParser p : pList) {
DataType<?> pValue = p.parseJobProperty(key, value);
if (pValue != null) {
filteredProperties.put(key, pValue);
break;
}
}
}
}
return filteredProperties;
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.ParsedHost;
import org.apache.hadoop.tools.rumen.anonymization.WordList;
import org.apache.hadoop.tools.rumen.state.State;
import org.apache.hadoop.tools.rumen.state.StatePool;
import org.codehaus.jackson.annotate.JsonIgnore;
/**
* Represents the cluster host.
*/
public class NodeName implements AnonymizableDataType<String> {
private String hostName;
private String rackName;
private String nodeName;
private String anonymizedNodeName;
public static final NodeName ROOT = new NodeName("<root>");
/**
* A composite state for node-name.
*/
public static class NodeNameState implements State {
private WordList rackNameState = new WordList("rack");
private WordList hostNameState = new WordList("host");
@Override
@JsonIgnore
public boolean isUpdated() {
return rackNameState.isUpdated() || hostNameState.isUpdated();
}
public WordList getRackNameState() {
return rackNameState;
}
public WordList getHostNameState() {
return hostNameState;
}
public void setRackNameState(WordList state) {
this.rackNameState = state;
}
public void setHostNameState(WordList state) {
this.hostNameState = state;
}
@Override
public String getName() {
return "node";
}
@Override
public void setName(String name) {
// for now, simply assert since this class has a hardcoded name
if (!getName().equals(name)) {
throw new RuntimeException("State name mismatch! Expected '"
+ getName() + "' but found '" + name + "'.");
}
}
}
public NodeName(String nodeName) {
this.nodeName = nodeName;
ParsedHost pHost = ParsedHost.parse(nodeName);
if (pHost == null) {
this.rackName = null;
this.hostName = nodeName;
} else {
//TODO check for null and improve .. possibly call NodeName(r,h)
this.rackName = pHost.getRackName();
this.hostName = pHost.getNodeName();
}
}
public NodeName(String rName, String hName) {
rName = (rName == null)
? rName
: rName.length() == 0
? null
: rName;
hName = (hName == null)
? hName
: hName.length() == 0
? null
: hName;
if (hName == null) {
nodeName = rName;
rackName = rName;
} else if (rName == null) {
nodeName = hName;
ParsedHost pHost = ParsedHost.parse(nodeName);
if (pHost == null) {
this.rackName = null;
this.hostName = hName;
} else {
this.rackName = pHost.getRackName();
this.hostName = pHost.getNodeName();
}
} else {
rackName = rName;
this.hostName = hName;
this.nodeName = "/" + rName + "/" + hName;
}
}
public String getHostName() {
return hostName;
}
public String getRackName() {
return rackName;
}
@Override
public String getValue() {
return nodeName;
}
@Override
public String getAnonymizedValue(StatePool statePool, Configuration conf) {
if (this.getValue().equals(ROOT.getValue())) {
return getValue();
}
if (anonymizedNodeName == null) {
anonymize(statePool);
}
return anonymizedNodeName;
}
private void anonymize(StatePool pool) {
StringBuffer buf = new StringBuffer();
NodeNameState state = (NodeNameState) pool.getState(getClass());
if (state == null) {
state = new NodeNameState();
pool.addState(getClass(), state);
}
if (rackName != null && hostName != null) {
buf.append('/');
buf.append(anonymize(rackName, state.getRackNameState()));
buf.append('/');
buf.append(anonymize(hostName, state.getHostNameState()));
} else {
if (state.getRackNameState().contains(nodeName) || rackName != null) {
buf.append(anonymize(nodeName, state.getRackNameState()));
} else {
buf.append(anonymize(nodeName, state.getHostNameState()));
}
}
anonymizedNodeName = buf.toString();
}
//TODO There is no caching for saving memory.
private static String anonymize(String data, WordList wordList) {
if (data == null) {
return null;
}
if (!wordList.contains(data)) {
wordList.add(data);
}
return wordList.getName() + wordList.indexOf(data);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
/**
* Represents a queue name.
*/
public class QueueName extends DefaultAnonymizableDataType {
private final String queueName;
public QueueName(String queueName) {
super();
this.queueName = queueName;
}
@Override
public String getValue() {
return queueName;
}
@Override
protected String getPrefix() {
return "queue";
};
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes;
/**
* Represents a user's name.
*/
public class UserName extends DefaultAnonymizableDataType {
private final String userName;
public UserName(String userName) {
super();
this.userName = userName;
}
@Override
public String getValue() {
return userName;
}
@Override
protected String getPrefix() {
return "user";
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes.util;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.apache.hadoop.tools.rumen.datatypes.DefaultDataType;
/**
* A simple job property parser that acts like a pass-through filter.
*/
public class DefaultJobPropertiesParser implements JobPropertyParser {
@Override
public DataType<?> parseJobProperty(String key, String value) {
return new DefaultDataType(value);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes.util;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
/**
* A {@link JobProperties} parsing utility.
*/
public interface JobPropertyParser {
/**
* Parse the specified job configuration key-value pair.
*
* @return Returns a {@link DataType} if this parser can parse this value.
* Returns 'null' otherwise.
*/
public DataType<?> parseJobProperty(String key, String value);
}

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.datatypes.util;
import java.lang.reflect.Field;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
* A default parser for MapReduce job configuration properties.
* MapReduce job configuration properties are represented as key-value pairs.
* Each key represents a configuration knob which controls or affects the
* behavior of a MapReduce job or a job's task. The value associated with the
* configuration key represents its value. Some of the keys are deprecated. As a
* result of deprecation some keys change or are preferred over other keys,
* across versions. {@link MapReduceJobPropertiesParser} is a utility class that
* parses MapReduce job configuration properties and converts the value into a
* well defined {@link DataType}. Users can use the
* {@link MapReduceJobPropertiesParser#parseJobProperty()} API to process job
* configuration parameters. This API will parse a job property represented as a
* key-value pair and return the value wrapped inside a {@link DataType}.
* Callers can then use the returned {@link DataType} for further processing.
*
* {@link MapReduceJobPropertiesParser} thrives on the key name to decide which
* {@link DataType} to wrap the value with. Values for keys representing
* job-name, queue-name, user-name etc are wrapped inside {@link JobName},
* {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir*
* are considered as a directory and hence gets be wrapped inside
* {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are
* also handled accordingly. Values representing basic java data-types like
* integer, float, double, boolean etc are wrapped inside
* {@link DefaultDataType}. If the key represents some jvm-level settings then
* only standard settings are extracted and gets wrapped inside
* {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are
* considered while the rest are ignored.
*
* Note that the {@link MapReduceJobPropertiesParser#parseJobProperty()} API
* maps the keys to a configuration parameter listed in
* {@link MRJobConfig}. This not only filters non-framework specific keys thus
* ignoring user-specific and hard-to-parse keys but also provides a consistent
* view for all possible inputs. So if users invoke the
* {@link MapReduceJobPropertiesParser#parseJobProperty()} API with either
* <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">, then the result
* would be a {@link UserName} {@link DataType} wrapping the user-name "bob".
*/
@SuppressWarnings("deprecation")
public class MapReduceJobPropertiesParser implements JobPropertyParser {
private Field[] mrFields = MRJobConfig.class.getFields();
private DecimalFormat format = new DecimalFormat();
private JobConf configuration = new JobConf(false);
private static final Pattern MAX_HEAP_PATTERN =
Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
private static final Pattern MIN_HEAP_PATTERN =
Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+");
// turn off the warning w.r.t deprecated mapreduce keys
static {
Logger.getLogger(Configuration.class).setLevel(Level.OFF);
}
// Accepts a key if there is a corresponding key in the current mapreduce
// configuration
private boolean accept(String key) {
return getLatestKeyName(key) != null;
}
// Finds a corresponding key for the specified key in the current mapreduce
// setup.
// Note that this API uses a cached copy of the Configuration object. This is
// purely for performance reasons.
private String getLatestKeyName(String key) {
// set the specified key
configuration.set(key, key);
try {
// check if keys in MRConfig maps to the specified key.
for (Field f : mrFields) {
String mrKey = f.get(f.getName()).toString();
if (configuration.get(mrKey) != null) {
return mrKey;
}
}
// unset the key
return null;
} catch (IllegalAccessException iae) {
throw new RuntimeException(iae);
} finally {
// clean up!
configuration.clear();
}
}
@Override
public DataType<?> parseJobProperty(String key, String value) {
if (accept(key)) {
return fromString(key, value);
}
return null;
}
/**
* Extracts the -Xmx heap option from the specified string.
*/
public static void extractMaxHeapOpts(String javaOptions,
List<String> heapOpts,
List<String> others) {
for (String opt : javaOptions.split(" ")) {
Matcher matcher = MAX_HEAP_PATTERN.matcher(opt);
if (matcher.find()) {
heapOpts.add(opt);
} else {
others.add(opt);
}
}
}
/**
* Extracts the -Xms heap option from the specified string.
*/
public static void extractMinHeapOpts(String javaOptions,
List<String> heapOpts, List<String> others) {
for (String opt : javaOptions.split(" ")) {
Matcher matcher = MIN_HEAP_PATTERN.matcher(opt);
if (matcher.find()) {
heapOpts.add(opt);
} else {
others.add(opt);
}
}
}
// Maps the value of the specified key.
private DataType<?> fromString(String key, String value) {
if (value != null) {
// check known configs
// job-name
String latestKey = getLatestKeyName(key);
if (MRJobConfig.JOB_NAME.equals(latestKey)) {
return new JobName(value);
}
// user-name
if (MRJobConfig.USER_NAME.equals(latestKey)) {
return new UserName(value);
}
// queue-name
if (MRJobConfig.QUEUE_NAME.equals(latestKey)) {
return new QueueName(value);
}
if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey)
|| MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) {
List<String> heapOptions = new ArrayList<String>();
extractMaxHeapOpts(value, heapOptions, new ArrayList<String>());
extractMinHeapOpts(value, heapOptions, new ArrayList<String>());
return new DefaultDataType(StringUtils.join(heapOptions, ' '));
}
//TODO compression?
//TODO Other job configs like FileOutputFormat/FileInputFormat etc
// check if the config parameter represents a number
try {
format.parse(value);
return new DefaultDataType(value);
} catch (ParseException pe) {}
// check if the config parameters represents a boolean
// avoiding exceptions
if ("true".equals(value) || "false".equals(value)) {
Boolean.parseBoolean(value);
return new DefaultDataType(value);
}
// check if the config parameter represents a class
if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) {
return new ClassName(value);
}
// handle distributed cache sizes and timestamps
if (latestKey.endsWith("sizes")
|| latestKey.endsWith(".timestamps")) {
new DefaultDataType(value);
}
// check if the config parameter represents a file-system path
//TODO: Make this concrete .location .path .dir .jar?
if (latestKey.endsWith(".dir") || latestKey.endsWith(".location")
|| latestKey.endsWith(".jar") || latestKey.endsWith(".path")
|| latestKey.endsWith(".logfile") || latestKey.endsWith(".file")
|| latestKey.endsWith(".files") || latestKey.endsWith(".archives")) {
try {
return new FileName(value);
} catch (Exception ioe) {}
}
}
return null;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.serializers;
import java.io.IOException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
/**
* A JSON serializer for Strings.
*/
public class BlockingSerializer extends JsonSerializer<String> {
public void serialize(String object, JsonGenerator jGen, SerializerProvider sProvider)
throws IOException, JsonProcessingException {
jGen.writeNull();
};
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.serializers;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.datatypes.AnonymizableDataType;
import org.apache.hadoop.tools.rumen.state.StatePool;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
/**
* Default Rumen JSON serializer.
*/
@SuppressWarnings("unchecked")
public class DefaultAnonymizingRumenSerializer
extends JsonSerializer<AnonymizableDataType> {
private StatePool statePool;
private Configuration conf;
public DefaultAnonymizingRumenSerializer(StatePool statePool,
Configuration conf) {
this.statePool = statePool;
this.conf = conf;
}
public void serialize(AnonymizableDataType object, JsonGenerator jGen,
SerializerProvider sProvider)
throws IOException, JsonProcessingException {
Object val = object.getAnonymizedValue(statePool, conf);
// output the data if its a string
if (val instanceof String) {
jGen.writeString(val.toString());
} else {
// let the mapper (JSON generator) handle this anonymized object.
jGen.writeObject(val);
}
};
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.serializers;
import java.io.IOException;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
/**
* Default Rumen JSON serializer.
*/
@SuppressWarnings("unchecked")
public class DefaultRumenSerializer extends JsonSerializer<DataType> {
public void serialize(DataType object, JsonGenerator jGen, SerializerProvider sProvider)
throws IOException, JsonProcessingException {
Object data = object.getValue();
if (data instanceof String) {
jGen.writeString(data.toString());
} else {
jGen.writeObject(data);
}
};
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.serializers;
import java.io.IOException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
/**
* Rumen JSON serializer for serializing object using toSring() API.
*/
public class ObjectStringSerializer<T> extends JsonSerializer<T> {
public void serialize(T object, JsonGenerator jGen, SerializerProvider sProvider)
throws IOException, JsonProcessingException {
jGen.writeString(object.toString());
};
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.state;
import org.codehaus.jackson.annotate.JsonIgnore;
/**
* Represents a state. This state is managed by {@link StatePool}.
*
* Note that a {@link State} objects should be persistable. Currently, the
* {@link State} objects are persisted using the Jackson JSON library. Hence the
* implementors of the {@link State} interface should be careful while defining
* their public setter and getter APIs.
*/
public interface State {
/**
* Returns true if the state is updated since creation (or reload).
*/
@JsonIgnore
boolean isUpdated();
/**
* Get the name of the state.
*/
public String getName();
/**
* Set the name of the state.
*/
public void setName(String name);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.state;
import java.io.IOException;
import org.apache.hadoop.tools.rumen.state.StatePool.StatePair;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.deser.StdDeserializer;
import org.codehaus.jackson.node.ObjectNode;
/**
* Rumen JSON deserializer for deserializing the {@link State} object.
*/
public class StateDeserializer extends StdDeserializer<StatePair> {
public StateDeserializer() {
super(StatePair.class);
}
@Override
public StatePair deserialize(JsonParser parser,
DeserializationContext context)
throws IOException, JsonProcessingException {
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
// set the state-pair object tree
ObjectNode statePairObject = (ObjectNode) mapper.readTree(parser);
Class<?> stateClass = null;
try {
stateClass =
Class.forName(statePairObject.get("className").getTextValue().trim());
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Invalid classname!", cnfe);
}
String stateJsonString = statePairObject.get("state").toString();
State state = (State) mapper.readValue(stateJsonString, stateClass);
return new StatePair(state);
}
}

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen.state;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.Anonymizer;
import org.apache.hadoop.tools.rumen.datatypes.DataType;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.module.SimpleModule;
/**
* A pool of states. States used by {@link DataType}'s can be managed the
* {@link StatePool}. {@link StatePool} also supports persistence. Persistence
* is key to share states across multiple {@link Anonymizer} runs.
*/
@SuppressWarnings("unchecked")
public class StatePool {
private static final long VERSION = 1L;
private boolean isUpdated = false;
private boolean isInitialized = false;
private Configuration conf;
// persistence configuration
public static final String DIR_CONFIG = "rumen.anonymization.states.dir";
public static final String RELOAD_CONFIG =
"rumen.anonymization.states.reload";
public static final String PERSIST_CONFIG =
"rumen.anonymization.states.persist";
// internal state management configs
private static final String COMMIT_STATE_FILENAME = "latest";
private static final String CURRENT_STATE_FILENAME = "temp";
private String timeStamp;
private Path persistDirPath;
private boolean reload;
private boolean persist;
/**
* A wrapper class that binds the state implementation to its implementing
* class name.
*/
public static class StatePair {
private String className;
private State state;
public StatePair(State state) {
this.className = state.getClass().getName();
this.state = state;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public State getState() {
return state;
}
public void setState(State state) {
this.state = state;
}
}
/**
* Identifies to identify and cache {@link State}s.
*/
private HashMap<String, StatePair> pool = new HashMap<String, StatePair>();
public void addState(Class id, State state) {
if (pool.containsKey(id.getName())) {
throw new RuntimeException("State '" + state.getName() + "' added for the"
+ " class " + id.getName() + " already exists!");
}
isUpdated = true;
pool.put(id.getName(), new StatePair(state));
}
public State getState(Class clazz) {
return pool.containsKey(clazz.getName())
? pool.get(clazz.getName()).getState()
: null;
}
// For testing
@JsonIgnore
public boolean isUpdated() {
if (!isUpdated) {
for (StatePair statePair : pool.values()) {
// if one of the states have changed, then the pool is dirty
if (statePair.getState().isUpdated()) {
isUpdated = true;
return true;
}
}
}
return isUpdated;
}
/**
* Initialized the {@link StatePool}. This API also reloads the previously
* persisted state. Note that the {@link StatePool} should be initialized only
* once.
*/
public void initialize(Configuration conf) throws Exception {
if (isInitialized) {
throw new RuntimeException("StatePool is already initialized!");
}
this.conf = conf;
String persistDir = conf.get(DIR_CONFIG);
reload = conf.getBoolean(RELOAD_CONFIG, false);
persist = conf.getBoolean(PERSIST_CONFIG, false);
// reload if configured
if (reload || persist) {
System.out.println("State Manager initializing. State directory : "
+ persistDir);
System.out.println("Reload:" + reload + " Persist:" + persist);
if (persistDir == null) {
throw new RuntimeException("No state persist directory configured!"
+ " Disable persistence.");
} else {
this.persistDirPath = new Path(persistDir);
}
} else {
System.out.println("State Manager disabled.");
}
// reload
reload();
// now set the timestamp
DateFormat formatter =
new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'");
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(System.currentTimeMillis());
timeStamp = formatter.format(calendar.getTime());
isInitialized = true;
}
private void reload() throws Exception {
if (reload) {
// Reload persisted entries
Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME);
FileSystem fs = stateFilename.getFileSystem(conf);
if (fs.exists(stateFilename)) {
reloadState(stateFilename, conf);
} else {
throw new RuntimeException("No latest state persist directory found!"
+ " Disable persistence and run.");
}
}
}
private void reloadState(Path stateFile, Configuration conf)
throws Exception {
FileSystem fs = stateFile.getFileSystem(conf);
if (fs.exists(stateFile)) {
System.out.println("Reading state from " + stateFile.toString());
FSDataInputStream in = fs.open(stateFile);
read(in);
in.close();
} else {
System.out.println("No state information found for " + stateFile);
}
}
private void read(DataInput in) throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
// define a module
SimpleModule module = new SimpleModule("State Serializer",
new Version(0, 1, 1, "FINAL"));
// add the state deserializer
module.addDeserializer(StatePair.class, new StateDeserializer());
// register the module with the object-mapper
mapper.registerModule(module);
JsonParser parser =
mapper.getJsonFactory().createJsonParser((DataInputStream)in);
StatePool statePool = mapper.readValue(parser, StatePool.class);
this.setStates(statePool.getStates());
parser.close();
}
/**
* Persists the current state to the state directory. The state will be
* persisted to the 'latest' file in the state directory.
*/
public void persist() throws IOException {
if (!persist) {
return;
}
if (isUpdated()) {
System.out.println("State is updated! Committing.");
Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME);
Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME);
FileSystem fs = currStateFile.getFileSystem(conf);
System.out.println("Starting the persist phase. Persisting to "
+ currStateFile.toString());
// persist current state
// write the contents of the current state to the current(temp) directory
FSDataOutputStream out = fs.create(currStateFile, true);
write(out);
out.close();
System.out.println("Persist phase over. The best known un-committed state"
+ " is located at " + currStateFile.toString());
// commit (phase-1)
// copy the previous commit file to the relocation file
if (fs.exists(commitStateFile)) {
Path commitRelocationFile = new Path(persistDirPath, timeStamp);
System.out.println("Starting the pre-commit phase. Moving the previous "
+ "best known state to " + commitRelocationFile.toString());
// copy the commit file to the relocation file
FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false,
conf);
}
// commit (phase-2)
System.out.println("Starting the commit phase. Committing the states in "
+ currStateFile.toString());
FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf);
System.out.println("Commit phase successful! The best known committed "
+ "state is located at " + commitStateFile.toString());
} else {
System.out.println("State not updated! No commit required.");
}
}
private void write(DataOutput out) throws IOException {
// This is just a JSON experiment
System.out.println("Dumping the StatePool's in JSON format.");
ObjectMapper outMapper = new ObjectMapper();
outMapper.configure(
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
// define a module
SimpleModule module = new SimpleModule("State Serializer",
new Version(0, 1, 1, "FINAL"));
// add the state serializer
//module.addSerializer(State.class, new StateSerializer());
// register the module with the object-mapper
outMapper.registerModule(module);
JsonFactory outFactory = outMapper.getJsonFactory();
JsonGenerator jGen =
outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8);
jGen.useDefaultPrettyPrinter();
jGen.writeObject(this);
jGen.close();
}
/**
* Getters and setters for JSON serialization
*/
/**
* To be invoked only by the Jackson JSON serializer.
*/
public long getVersion() {
return VERSION;
}
/**
* To be invoked only by the Jackson JSON deserializer.
*/
public void setVersion(long version) {
if (version != VERSION) {
throw new RuntimeException("Version mismatch! Expected " + VERSION
+ " got " + version);
}
}
/**
* To be invoked only by the Jackson JSON serializer.
*/
public HashMap<String, StatePair> getStates() {
return pool;
}
/**
* To be invoked only by the Jackson JSON deserializer.
*/
public void setStates(HashMap<String, StatePair> states) {
if (pool.size() > 0) {
throw new RuntimeException("Pool not empty!");
}
//TODO Should we do a clone?
this.pool = states;
}
}