MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1215141 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6d551b83de
commit
a238f931ea
|
@ -6,6 +6,7 @@ Trunk (unreleased changes)
|
|||
MAPREDUCE-3545. Remove Avro RPC. (suresh)
|
||||
|
||||
NEW FEATURES
|
||||
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
|
||||
|
||||
MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
|
||||
(Plamen Jeliazkov via shv)
|
||||
|
|
|
@ -139,6 +139,13 @@
|
|||
<dependency org="org.vafer" name="jdeb" rev="${jdeb.version}" conf="package->master"/>
|
||||
<dependency org="org.mortbay.jetty" name="jetty-servlet-tester" rev="${jetty.version}"
|
||||
conf="test->default"/>
|
||||
|
||||
<!-- dependency for rumen anonymization -->
|
||||
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}"
|
||||
conf="compile->default"/>
|
||||
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"
|
||||
conf="compile->default"/>
|
||||
|
||||
<!-- dependency addition for the fault injection -->
|
||||
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
|
||||
conf="compile->default"/>
|
||||
|
|
|
@ -81,5 +81,6 @@ wagon-http.version=1.0-beta-2
|
|||
xmlenc.version=0.52
|
||||
xerces.version=1.4.4
|
||||
|
||||
jackson.version=1.8.2
|
||||
yarn.version=0.24.0-SNAPSHOT
|
||||
hadoop-mapreduce.version=0.24.0-SNAPSHOT
|
||||
|
|
|
@ -26,8 +26,6 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.tools.rumen.JobStory;
|
||||
import static org.apache.hadoop.tools.rumen.datatypes.util.MapReduceJobPropertiesParser.extractMaxHeapOpts;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -92,8 +91,6 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
|
|||
// configuration key to enable/disable task jvm options
|
||||
static final String GRIDMIX_TASK_JVM_OPTIONS_ENABLE =
|
||||
"gridmix.task.jvm-options.enable";
|
||||
private static final Pattern maxHeapPattern =
|
||||
Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
|
||||
|
||||
private static void setJobQueue(Job job, String queue) {
|
||||
if (queue != null) {
|
||||
|
@ -226,18 +223,6 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
|
|||
}
|
||||
}
|
||||
|
||||
private static void extractMaxHeapOpts(String javaOptions,
|
||||
List<String> maxOpts, List<String> others) {
|
||||
for (String opt : javaOptions.split(" ")) {
|
||||
Matcher matcher = maxHeapPattern.matcher(opt);
|
||||
if (matcher.find()) {
|
||||
maxOpts.add(opt);
|
||||
} else {
|
||||
others.add(opt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Scales the desired job-level configuration parameter. This API makes sure
|
||||
// that the ratio of the job level configuration parameter to the cluster
|
||||
// level configuration parameter is maintained in the simulated run. Hence
|
||||
|
|
|
@ -73,6 +73,11 @@
|
|||
computed for the total number of successful tasks for every attempt.
|
||||
|
||||
</li>
|
||||
<li>Anonymized traces enables sharing of production traces of large
|
||||
scale Hadoop deployments. Sharing of traces will foster
|
||||
collaboration within the Hadoop community. It can also be used to
|
||||
supplement interesting research findings.
|
||||
</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
|
@ -102,6 +107,11 @@
|
|||
Increasing the trace runtime might involve adding some dummy jobs to
|
||||
the resulting trace and scaling up the runtime of individual jobs.
|
||||
</li>
|
||||
<li><em>Anonymizer</em> :
|
||||
A utility to anonymize Hadoop job and cluster topology traces by
|
||||
masking certain sensitive fields but retaining important workload
|
||||
characteristics.
|
||||
</li>
|
||||
|
||||
</ul>
|
||||
<p></p><p></p><p></p>
|
||||
|
@ -128,10 +138,11 @@
|
|||
<code>output-duration</code>, <code>concentration</code> etc.
|
||||
</note>
|
||||
|
||||
<p><em>Rumen</em> provides 2 basic commands</p>
|
||||
<p><em>Rumen</em> provides 3 basic commands</p>
|
||||
<ul>
|
||||
<li><code>TraceBuilder</code></li>
|
||||
<li><code>Folder</code></li>
|
||||
<li><code>Anonymizer</code></li>
|
||||
</ul>
|
||||
|
||||
<p>Firstly, we need to generate the <em>Gold Trace</em>. Hence the first
|
||||
|
@ -139,8 +150,9 @@
|
|||
The output of the <code>TraceBuilder</code> is a job-trace file (and an
|
||||
optional cluster-topology file). In case we want to scale the output, we
|
||||
can use the <code>Folder</code> utility to fold the current trace to the
|
||||
desired length. The remaining part of this section explains these
|
||||
utilities in detail.
|
||||
desired length. For anonymizing the trace, use the
|
||||
<code>Anonymizer</code> utility. The remaining part of this section
|
||||
explains these utilities in detail.
|
||||
</p>
|
||||
|
||||
<note>Examples in this section assumes that certain libraries are present
|
||||
|
@ -426,8 +438,156 @@
|
|||
</p>
|
||||
</section>
|
||||
</section>
|
||||
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
|
||||
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
|
||||
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
|
||||
<p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p><p></p>
|
||||
<p></p><p></p><p></p><p></p>
|
||||
|
||||
</section>
|
||||
|
||||
<!--
|
||||
Anonymizer command
|
||||
-->
|
||||
<section>
|
||||
<title>Anonymizer</title>
|
||||
|
||||
<p><code>Command:</code></p>
|
||||
<source>java org.apache.hadoop.tools.rumen.Anonymizer [options] [-trace <jobtrace-input> <jobtrace-output>] [-topology <topology-input> <topology-output>]</source>
|
||||
|
||||
<p>This command invokes the <em>Anonymizer</em> utility of
|
||||
<em>Rumen</em>. It anonymizes sensitive information from the
|
||||
<code><jobtrace-input></code> file and outputs the anonymized
|
||||
content into the <code><jobtrace-output></code>
|
||||
file. It also anonymizes the cluster layout (topology) from the
|
||||
<code><topology-input></code> and outputs it in
|
||||
the <code><topology-output></code> file.
|
||||
<code><job-input></code> represents the job trace file obtained
|
||||
using <code>TraceBuilder</code> or <code>Folder</code>.
|
||||
<code><topology-input></code> represents the cluster topology
|
||||
file obtained using <code>TraceBuilder</code>.
|
||||
</p>
|
||||
|
||||
<p><code>Options :</code></p>
|
||||
<table>
|
||||
<tr>
|
||||
<th>Parameter</th>
|
||||
<th>Description</th>
|
||||
<th>Notes</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>-trace</code></td>
|
||||
<td>Anonymizes job traces.</td>
|
||||
<td>Anonymizes sensitive fields like user-name, job-name, queue-name
|
||||
host-names, job configuration parameters etc.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>-topology</code></td>
|
||||
<td>Anonymizes cluster topology</td>
|
||||
<td>Anonymizes rack-names and host-names.</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<section id="anonymizerconf">
|
||||
<title><em>Anonymizer</em> Configuration Parameters</title>
|
||||
<p>The Rumen anonymizer can be configured using the following
|
||||
configuration parameters:
|
||||
</p>
|
||||
<table>
|
||||
<tr>
|
||||
<th>Parameter</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>rumen.data-types.classname.preserve</code>
|
||||
</td>
|
||||
<td>A comma separated list of prefixes that the <em>Anonymizer</em>
|
||||
will not anonymize while processing classnames. If
|
||||
<code>rumen.data-types.classname.preserve</code> is set to
|
||||
<code>'org.apache,com.hadoop.'</code> then
|
||||
classnames starting with <code>'org.apache'</code> or
|
||||
<code>'com.hadoop.'</code> will not be anonymized.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>rumen.datatypes.jobproperties.parsers</code>
|
||||
</td>
|
||||
<td>A comma separated list of job properties parsers. These parsers
|
||||
decide how the job configuration parameters
|
||||
(i.e <key,value> pairs) should be processed. Default is
|
||||
<code>MapReduceJobPropertiesParser</code>. The default parser will
|
||||
only parse framework-level MapReduce specific job configuration
|
||||
properties. Users can add custom parsers by implementing the
|
||||
<code>JobPropertiesParser</code> interface. Rumen also provides an
|
||||
all-pass (i.e no filter) parser called
|
||||
<code>DefaultJobPropertiesParser</code>.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>rumen.anonymization.states.dir</code>
|
||||
</td>
|
||||
<td>Set this to a location (on LocalFileSystem or HDFS) for enabling
|
||||
state persistence and/or reload. This parameter is not set by
|
||||
default. Reloading and persistence of states depend on the state
|
||||
directory. Note that the state directory will contain the latest
|
||||
as well as previous states.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>rumen.anonymization.states.persist</code>
|
||||
</td>
|
||||
<td>Set this to <code>'true'</code> to persist the current state.
|
||||
Default value is <code>'false'</code>. Note that the states will
|
||||
be persisted to the state manager's state directory
|
||||
specified using the <code>rumen.anonymization.states.dir</code>
|
||||
parameter.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>rumen.anonymization.states.reload</code>
|
||||
</td>
|
||||
<td>Set this to <code>'true'</code> to enable reuse of previously
|
||||
persisted state. The default value is <code>'false'</code>. The
|
||||
previously persisted state will be reloaded from the state
|
||||
manager's state directory specified using the
|
||||
<code>rumen.anonymization.states.dir</code> parameter. Note that
|
||||
the <em>Anonymizer</em> will bail out if it fails to find any
|
||||
previously persisted state in the state directory or if the state
|
||||
directory is not set. If the user wishes to retain/reuse the
|
||||
states across multiple invocations of the <em>Anonymizer</em>,
|
||||
then the very first invocation of the <em>Anonymizer</em> should
|
||||
have <code>rumen.anonymization.states.reload</code> set to
|
||||
<code>'false'</code> and
|
||||
<code>rumen.anonymization.states.persist</code> set to
|
||||
<code>'true'</code>. Subsequent invocations of the
|
||||
<em>Anonymizer</em> can then have
|
||||
<code>rumen.anonymization.states.reload</code> set to
|
||||
<code>'true'</code>.
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<title>Example</title>
|
||||
<source>java org.apache.hadoop.tools.rumen.Anonymizer -trace file:///home/user/job-trace.json file:///home/user/job-trace-anonymized.json -topology file:///home/user/cluster-topology.json file:///home/user/cluster-topology-anonymized.json</source>
|
||||
<p></p>
|
||||
<p>This will anonymize the job details from
|
||||
<code>file:///home/user/job-trace.json</code> and output it to
|
||||
<code>file:///home/user/job-trace-anonymized.json</code>.
|
||||
It will also anonymize the cluster topology layout from
|
||||
<code>file:///home/user/cluster-topology.json</code> and output it to
|
||||
<code>file:///home/user/cluster-topology-anonymized.json</code>.
|
||||
Note that the <code>Anonymizer</code> also supports input and output
|
||||
files on HDFS.
|
||||
</p>
|
||||
</section>
|
||||
</section>
|
||||
<p></p><p></p><p></p>
|
||||
</section>
|
||||
|
||||
<!--
|
||||
|
@ -452,8 +612,8 @@
|
|||
<li><code>Hadoop Common</code> (<code>hadoop-common-{hadoop-version}.jar</code>)</li>
|
||||
<li><code>Apache Commons Logging</code> (<code>commons-logging-1.1.1.jar</code>)</li>
|
||||
<li><code>Apache Commons CLI</code> (<code>commons-cli-1.2.jar</code>)</li>
|
||||
<li><code>Jackson Mapper</code> (<code>jackson-mapper-asl-1.4.2.jar</code>)</li>
|
||||
<li><code>Jackson Core</code> (<code>jackson-core-asl-1.4.2.jar</code>)</li>
|
||||
<li><code>Jackson Mapper</code> (<code>jackson-mapper-asl-1.8.2.jar</code>)</li>
|
||||
<li><code>Jackson Core</code> (<code>jackson-core-asl-1.8.2.jar</code>)</li>
|
||||
</ul>
|
||||
|
||||
<note>One simple way to run Rumen is to use '$HADOOP_PREFIX/bin/hadoop jar'
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,12 +20,8 @@ package org.apache.hadoop.tools.rumen;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
|
|
@ -960,11 +960,11 @@ public class TestRumenJobTraces {
|
|||
|
||||
for (LoggedNetworkTopology rack : racks) {
|
||||
List<LoggedNetworkTopology> nodes = rack.getChildren();
|
||||
if (rack.getName().endsWith(".64")) {
|
||||
if (rack.getName().getValue().endsWith(".64")) {
|
||||
assertEquals("The singleton rack has the wrong number of elements", 1,
|
||||
nodes.size());
|
||||
sawSingleton = true;
|
||||
} else if (rack.getName().endsWith(".80")) {
|
||||
} else if (rack.getName().getValue().endsWith(".80")) {
|
||||
assertEquals("The doubleton rack has the wrong number of elements", 2,
|
||||
nodes.size());
|
||||
sawDoubleton = true;
|
||||
|
|
|
@ -0,0 +1,273 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools.rumen;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.compress.CodecPool;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.mapreduce.ID;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.*;
|
||||
import org.apache.hadoop.tools.rumen.serializers.*;
|
||||
import org.apache.hadoop.tools.rumen.state.*;
|
||||
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.Version;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.module.SimpleModule;
|
||||
|
||||
public class Anonymizer extends Configured implements Tool {
|
||||
private boolean anonymizeTrace = false;
|
||||
private Path inputTracePath = null;
|
||||
private Path outputTracePath = null;
|
||||
private boolean anonymizeTopology = false;
|
||||
private Path inputTopologyPath = null;
|
||||
private Path outputTopologyPath = null;
|
||||
|
||||
//TODO Make this final if not using JSON
|
||||
// private final StatePool statePool = new StatePool();
|
||||
private StatePool statePool;
|
||||
|
||||
private ObjectMapper outMapper = null;
|
||||
private JsonFactory outFactory = null;
|
||||
|
||||
private void initialize(String[] args) throws Exception {
|
||||
try {
|
||||
for (int i = 0; i < args.length; ++i) {
|
||||
if ("-trace".equals(args[i])) {
|
||||
anonymizeTrace = true;
|
||||
inputTracePath = new Path(args[i+1]);
|
||||
outputTracePath = new Path(args[i+2]);
|
||||
i +=2;
|
||||
}
|
||||
if ("-topology".equals(args[i])) {
|
||||
anonymizeTopology = true;
|
||||
inputTopologyPath = new Path(args[i+1]);
|
||||
outputTopologyPath = new Path(args[i+2]);
|
||||
i +=2;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Illegal arguments list!", e);
|
||||
}
|
||||
|
||||
if (!anonymizeTopology && !anonymizeTrace) {
|
||||
throw new IllegalArgumentException("Invalid arguments list!");
|
||||
}
|
||||
|
||||
statePool = new StatePool();
|
||||
// initialize the state manager after the anonymizers are registered
|
||||
statePool.initialize(getConf());
|
||||
|
||||
outMapper = new ObjectMapper();
|
||||
// define a module
|
||||
SimpleModule module = new SimpleModule("Anonymization Serializer",
|
||||
new Version(0, 1, 1, "FINAL"));
|
||||
// add various serializers to the module
|
||||
// use the default (as-is) serializer for default data types
|
||||
module.addSerializer(DataType.class, new DefaultRumenSerializer());
|
||||
// use a blocking serializer for Strings as they can contain sensitive
|
||||
// information
|
||||
module.addSerializer(String.class, new BlockingSerializer());
|
||||
// use object.toString() for object of type ID
|
||||
module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
|
||||
// use getAnonymizedValue() for data types that have the anonymizing
|
||||
// feature
|
||||
module.addSerializer(AnonymizableDataType.class,
|
||||
new DefaultAnonymizingRumenSerializer(statePool, getConf()));
|
||||
|
||||
// register the module with the object-mapper
|
||||
outMapper.registerModule(module);
|
||||
|
||||
outFactory = outMapper.getJsonFactory();
|
||||
}
|
||||
|
||||
// anonymize the job trace file
|
||||
private void anonymizeTrace() throws Exception {
|
||||
if (anonymizeTrace) {
|
||||
System.out.println("Anonymizing trace file: " + inputTracePath);
|
||||
JobTraceReader reader = null;
|
||||
JsonGenerator outGen = null;
|
||||
Configuration conf = getConf();
|
||||
|
||||
try {
|
||||
// create a generator
|
||||
outGen = createJsonGenerator(conf, outputTracePath);
|
||||
|
||||
// define the input trace reader
|
||||
reader = new JobTraceReader(inputTracePath, conf);
|
||||
|
||||
// read the plain unanonymized logged job
|
||||
LoggedJob job = reader.getNext();
|
||||
|
||||
while (job != null) {
|
||||
// write it via an anonymizing channel
|
||||
outGen.writeObject(job);
|
||||
// read the next job
|
||||
job = reader.getNext();
|
||||
}
|
||||
|
||||
System.out.println("Anonymized trace file: " + outputTracePath);
|
||||
} finally {
|
||||
if (outGen != null) {
|
||||
outGen.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// anonymize the cluster topology file
|
||||
private void anonymizeTopology() throws Exception {
|
||||
if (anonymizeTopology) {
|
||||
System.out.println("Anonymizing topology file: " + inputTopologyPath);
|
||||
ClusterTopologyReader reader = null;
|
||||
JsonGenerator outGen = null;
|
||||
Configuration conf = getConf();
|
||||
|
||||
try {
|
||||
// create a generator
|
||||
outGen = createJsonGenerator(conf, outputTopologyPath);
|
||||
|
||||
// define the input cluster topology reader
|
||||
reader = new ClusterTopologyReader(inputTopologyPath, conf);
|
||||
|
||||
// read the plain unanonymized logged job
|
||||
LoggedNetworkTopology job = reader.get();
|
||||
|
||||
// write it via an anonymizing channel
|
||||
outGen.writeObject(job);
|
||||
|
||||
System.out.println("Anonymized topology file: " + outputTopologyPath);
|
||||
} finally {
|
||||
if (outGen != null) {
|
||||
outGen.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a JSON generator
|
||||
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
|
||||
throws IOException {
|
||||
FileSystem outFS = path.getFileSystem(conf);
|
||||
CompressionCodec codec =
|
||||
new CompressionCodecFactory(conf).getCodec(path);
|
||||
OutputStream output;
|
||||
Compressor compressor = null;
|
||||
if (codec != null) {
|
||||
compressor = CodecPool.getCompressor(codec);
|
||||
output = codec.createOutputStream(outFS.create(path), compressor);
|
||||
} else {
|
||||
output = outFS.create(path);
|
||||
}
|
||||
|
||||
JsonGenerator outGen = outFactory.createJsonGenerator(output,
|
||||
JsonEncoding.UTF8);
|
||||
outGen.useDefaultPrettyPrinter();
|
||||
|
||||
return outGen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
try {
|
||||
initialize(args);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
printUsage();
|
||||
return -1;
|
||||
}
|
||||
|
||||
return run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the actual anonymization tool.
|
||||
*/
|
||||
public int run() throws Exception {
|
||||
try {
|
||||
anonymizeTrace();
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Error running the trace anonymizer!");
|
||||
ioe.printStackTrace();
|
||||
System.out.println("\n\nAnonymization unsuccessful!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
anonymizeTopology();
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Error running the cluster topology anonymizer!");
|
||||
ioe.printStackTrace();
|
||||
System.out.println("\n\nAnonymization unsuccessful!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
statePool.persist();
|
||||
|
||||
System.out.println("Anonymization completed successfully!");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static void printUsage() {
|
||||
System.out.println("\nUsage:-");
|
||||
System.out.print(" Anonymizer");
|
||||
System.out.print(" [-trace <input-trace-path> <output-trace-path>]");
|
||||
System.out.println(" [-topology <input-topology-path> "
|
||||
+ "<output-topology-path>] ");
|
||||
System.out.print("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* The main driver program to use the anonymization utility.
|
||||
* @param args
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
Anonymizer instance = new Anonymizer();
|
||||
int result = 0;
|
||||
|
||||
try {
|
||||
result = ToolRunner.run(instance, args);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(System.err);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
if (result != 0) {
|
||||
System.exit(result);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
|
@ -35,23 +35,12 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.CodecPool;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig;
|
||||
|
||||
public class Folder extends Configured implements Tool {
|
||||
private long outputDuration = -1;
|
||||
private long inputCycle = -1;
|
||||
|
@ -66,7 +55,7 @@ public class Folder extends Configured implements Tool {
|
|||
static final private Log LOG = LogFactory.getLog(Folder.class);
|
||||
|
||||
private DeskewedJobTraceReader reader = null;
|
||||
private JsonGenerator outGen = null;
|
||||
private Outputter<LoggedJob> outGen = null;
|
||||
|
||||
private List<Path> tempPaths = new LinkedList<Path>();
|
||||
|
||||
|
@ -171,25 +160,8 @@ public class Folder extends Configured implements Tool {
|
|||
skewBufferLength, !allowMissorting);
|
||||
Path outPath = new Path(outputPathName);
|
||||
|
||||
ObjectMapper outMapper = new ObjectMapper();
|
||||
outMapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
JsonFactory outFactory = outMapper.getJsonFactory();
|
||||
FileSystem outFS = outPath.getFileSystem(conf);
|
||||
|
||||
CompressionCodec codec =
|
||||
new CompressionCodecFactory(conf).getCodec(outPath);
|
||||
OutputStream output;
|
||||
Compressor compressor = null;
|
||||
if (codec != null) {
|
||||
compressor = CodecPool.getCompressor(codec);
|
||||
output = codec.createOutputStream(outFS.create(outPath), compressor);
|
||||
} else {
|
||||
output = outFS.create(outPath);
|
||||
}
|
||||
|
||||
outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
|
||||
outGen.useDefaultPrettyPrinter();
|
||||
outGen = new DefaultOutputter<LoggedJob>();
|
||||
outGen.init(outPath, conf);
|
||||
|
||||
tempDir =
|
||||
tempDirName == null ? outPath.getParent() : new Path(tempDirName);
|
||||
|
@ -258,11 +230,6 @@ public class Folder extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
ObjectMapper outMapper = new ObjectMapper();
|
||||
outMapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
JsonFactory outFactory = outMapper.getJsonFactory();
|
||||
|
||||
// we initialize an empty heap so if we take an error before establishing
|
||||
// a real one the finally code goes through
|
||||
Queue<Pair<LoggedJob, JobTraceReader>> heap =
|
||||
|
@ -310,8 +277,7 @@ public class Folder extends Configured implements Tool {
|
|||
long currentIntervalEnd = Long.MIN_VALUE;
|
||||
|
||||
Path nextSegment = null;
|
||||
OutputStream tempUncompOut = null;
|
||||
JsonGenerator tempGen = null;
|
||||
Outputter<LoggedJob> tempGen = null;
|
||||
|
||||
if (debug) {
|
||||
LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
|
||||
|
@ -333,7 +299,9 @@ public class Folder extends Configured implements Tool {
|
|||
if (tempGen != null) {
|
||||
tempGen.close();
|
||||
}
|
||||
for (int i = 0; i < 3 && tempUncompOut == null; ++i) {
|
||||
|
||||
nextSegment = null;
|
||||
for (int i = 0; i < 3 && nextSegment == null; ++i) {
|
||||
try {
|
||||
nextSegment =
|
||||
new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
|
||||
|
@ -347,7 +315,7 @@ public class Folder extends Configured implements Tool {
|
|||
|
||||
try {
|
||||
if (!fs.exists(nextSegment)) {
|
||||
tempUncompOut = fs.create(nextSegment, false);
|
||||
break;
|
||||
}
|
||||
|
||||
continue;
|
||||
|
@ -360,6 +328,10 @@ public class Folder extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
if (nextSegment == null) {
|
||||
throw new RuntimeException("Failed to create a new file!");
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
LOG.debug("Creating " + nextSegment
|
||||
+ " for a job with a submit time of " + job.getSubmitTime());
|
||||
|
@ -369,23 +341,8 @@ public class Folder extends Configured implements Tool {
|
|||
|
||||
tempPaths.add(nextSegment);
|
||||
|
||||
CompressionCodec codec =
|
||||
new CompressionCodecFactory(conf).getCodec(nextSegment);
|
||||
OutputStream output;
|
||||
Compressor compressor = null;
|
||||
if (codec != null) {
|
||||
compressor = CodecPool.getCompressor(codec);
|
||||
output = codec.createOutputStream(tempUncompOut, compressor);
|
||||
} else {
|
||||
output = tempUncompOut;
|
||||
}
|
||||
|
||||
tempUncompOut = null;
|
||||
|
||||
tempGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
|
||||
if (debug) {
|
||||
tempGen.useDefaultPrettyPrinter();
|
||||
}
|
||||
tempGen = new DefaultOutputter<LoggedJob>();
|
||||
tempGen.init(nextSegment, conf);
|
||||
|
||||
long currentIntervalNumber =
|
||||
(job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
|
||||
|
@ -396,7 +353,9 @@ public class Folder extends Configured implements Tool {
|
|||
|
||||
// the temp files contain UDadjusted times, but each temp file's
|
||||
// content is in the same input cycle interval.
|
||||
tempGen.writeObject(job);
|
||||
if (tempGen != null) {
|
||||
tempGen.output(job);
|
||||
}
|
||||
|
||||
job = reader.nextJob();
|
||||
}
|
||||
|
@ -541,11 +500,11 @@ public class Folder extends Configured implements Tool {
|
|||
|
||||
private void maybeOutput(LoggedJob job) throws IOException {
|
||||
for (int i = 0; i < transcriptionRateInteger; ++i) {
|
||||
outGen.writeObject(job);
|
||||
outGen.output(job);
|
||||
}
|
||||
|
||||
if (random.nextDouble() < transcriptionRateFraction) {
|
||||
outGen.writeObject(job);
|
||||
outGen.output(job);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,12 +56,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|||
import org.apache.hadoop.io.compress.CodecPool;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig;
|
||||
|
||||
/**
|
||||
* This is the main class for rumen log mining functionality.
|
||||
|
@ -126,7 +121,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
*/
|
||||
private boolean omitTaskDetails = false;
|
||||
|
||||
private JsonGenerator jobTraceGen = null;
|
||||
private Outputter<LoggedJob> jobTraceGen = null;
|
||||
|
||||
private boolean prettyprintTrace = true;
|
||||
|
||||
|
@ -148,7 +143,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
|
||||
private int[] attemptTimesPercentiles;
|
||||
|
||||
private JsonGenerator topologyGen = null;
|
||||
private Outputter<LoggedNetworkTopology> topologyGen = null;
|
||||
|
||||
private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>();
|
||||
|
||||
|
@ -502,28 +497,12 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
}
|
||||
|
||||
if (jobTraceFilename != null) {
|
||||
ObjectMapper jmapper = new ObjectMapper();
|
||||
jmapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
JsonFactory jfactory = jmapper.getJsonFactory();
|
||||
FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
|
||||
jobTraceGen =
|
||||
jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
|
||||
JsonEncoding.UTF8);
|
||||
if (prettyprintTrace) {
|
||||
jobTraceGen.useDefaultPrettyPrinter();
|
||||
}
|
||||
jobTraceGen = new DefaultOutputter<LoggedJob>();
|
||||
jobTraceGen.init(jobTraceFilename, getConf());
|
||||
|
||||
if (topologyFilename != null) {
|
||||
ObjectMapper tmapper = new ObjectMapper();
|
||||
tmapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
JsonFactory tfactory = tmapper.getJsonFactory();
|
||||
FileSystem topoFS = topologyFilename.getFileSystem(getConf());
|
||||
topologyGen =
|
||||
tfactory.createJsonGenerator(topoFS.create(topologyFilename),
|
||||
JsonEncoding.UTF8);
|
||||
topologyGen.useDefaultPrettyPrinter();
|
||||
topologyGen = new DefaultOutputter<LoggedNetworkTopology>();
|
||||
topologyGen.init(topologyFilename, getConf());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -795,8 +774,8 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
*/
|
||||
if (jobID != null
|
||||
&& jobTraceGen != null
|
||||
&& (jobBeingTraced == null || !jobID.equals(jobBeingTraced
|
||||
.getJobID()))) {
|
||||
&& (jobBeingTraced == null
|
||||
|| !jobID.equals(jobBeingTraced.getJobID().toString()))) {
|
||||
// push out the old job if there is one, even though it did't get
|
||||
// mated
|
||||
// with a conf.
|
||||
|
@ -1615,7 +1594,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
|
||||
private void maybeMateJobAndConf() throws IOException {
|
||||
if (jobBeingTraced != null && jobconf != null
|
||||
&& jobBeingTraced.getJobID().equals(jobconf.jobID)) {
|
||||
&& jobBeingTraced.getJobID().toString().equals(jobconf.jobID)) {
|
||||
jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes);
|
||||
|
||||
jobBeingTraced.setQueue(jobconf.queue);
|
||||
|
@ -1692,9 +1671,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
jobBeingTraced.setMapperTriesToSucceed(null);
|
||||
}
|
||||
|
||||
jobTraceGen.writeObject(jobBeingTraced);
|
||||
|
||||
jobTraceGen.writeRaw("\n");
|
||||
jobTraceGen.output(jobBeingTraced);
|
||||
|
||||
jobBeingTraced = null;
|
||||
}
|
||||
|
@ -1792,7 +1769,7 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
|
|||
if (topologyGen != null) {
|
||||
LoggedNetworkTopology topo =
|
||||
new LoggedNetworkTopology(allHosts, "<root>", 0);
|
||||
topologyGen.writeObject(topo);
|
||||
topologyGen.output(topo);
|
||||
topologyGen.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.mapred.TaskStatus;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|||
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
|
||||
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
|
@ -83,11 +85,6 @@ public class JobBuilder {
|
|||
* The number of splits a task can have, before we ignore them all.
|
||||
*/
|
||||
private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
|
||||
/**
|
||||
* The regular expression used to parse task attempt IDs in job tracker logs
|
||||
*/
|
||||
private final static Pattern taskAttemptIDPattern =
|
||||
Pattern.compile(".*_([0-9]+)");
|
||||
|
||||
private int[] attemptTimesPercentiles = null;
|
||||
|
||||
|
@ -262,7 +259,9 @@ public class JobBuilder {
|
|||
finalized = true;
|
||||
|
||||
// set the conf
|
||||
if (jobConfigurationParameters != null) {
|
||||
result.setJobProperties(jobConfigurationParameters);
|
||||
}
|
||||
|
||||
// initialize all the per-job statistics gathering places
|
||||
Histogram[] successfulMapAttemptTimes =
|
||||
|
@ -314,20 +313,10 @@ public class JobBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
String attemptID = attempt.getAttemptID();
|
||||
TaskAttemptID attemptID = attempt.getAttemptID();
|
||||
|
||||
if (attemptID != null) {
|
||||
Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
|
||||
|
||||
if (matcher.matches()) {
|
||||
String attemptNumberString = matcher.group(1);
|
||||
|
||||
if (attemptNumberString != null) {
|
||||
int attemptNumber = Integer.parseInt(attemptNumberString);
|
||||
|
||||
successfulNthMapperAttempts.enter(attemptNumber);
|
||||
}
|
||||
}
|
||||
successfulNthMapperAttempts.enter(attemptID.getId());
|
||||
}
|
||||
} else {
|
||||
if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
|
||||
|
|
|
@ -21,10 +21,16 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.mapreduce.ID;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DataType;
|
||||
import org.apache.hadoop.tools.rumen.serializers.DefaultRumenSerializer;
|
||||
import org.apache.hadoop.tools.rumen.serializers.ObjectStringSerializer;
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.Version;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig;
|
||||
import org.codehaus.jackson.map.module.SimpleModule;
|
||||
|
||||
/**
|
||||
* Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
|
||||
|
@ -37,6 +43,19 @@ public class JsonObjectMapperWriter<T> implements Closeable {
|
|||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
|
||||
// define a module
|
||||
SimpleModule module = new SimpleModule("Default Serializer",
|
||||
new Version(0, 1, 1, "FINAL"));
|
||||
// add various serializers to the module
|
||||
// add default (all-pass) serializer for all rumen specific data types
|
||||
module.addSerializer(DataType.class, new DefaultRumenSerializer());
|
||||
// add a serializer to use object.toString() while serializing
|
||||
module.addSerializer(ID.class, new ObjectStringSerializer<ID>());
|
||||
|
||||
// register the module with the object-mapper
|
||||
mapper.registerModule(module);
|
||||
|
||||
mapper.getJsonFactory();
|
||||
writer = mapper.getJsonFactory().createJsonGenerator(
|
||||
output, JsonEncoding.UTF8);
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.util.Properties;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.*;
|
||||
import org.codehaus.jackson.annotate.JsonAnySetter;
|
||||
|
||||
/**
|
||||
|
@ -50,8 +52,8 @@ public class LoggedJob implements DeepCompare {
|
|||
static private Set<String> alreadySeenAnySetterAttributes =
|
||||
new TreeSet<String>();
|
||||
|
||||
String jobID;
|
||||
String user;
|
||||
JobID jobID;
|
||||
UserName user;
|
||||
long computonsPerMapInputByte = -1L;
|
||||
long computonsPerMapOutputByte = -1L;
|
||||
long computonsPerReduceInputByte = -1L;
|
||||
|
@ -80,9 +82,9 @@ public class LoggedJob implements DeepCompare {
|
|||
LoggedDiscreteCDF successfulReduceAttemptCDF;
|
||||
LoggedDiscreteCDF failedReduceAttemptCDF;
|
||||
|
||||
String queue = null;
|
||||
QueueName queue = null;
|
||||
|
||||
String jobName = null;
|
||||
JobName jobName = null;
|
||||
|
||||
int clusterMapMB = -1;
|
||||
int clusterReduceMB = -1;
|
||||
|
@ -94,7 +96,7 @@ public class LoggedJob implements DeepCompare {
|
|||
double[] mapperTriesToSucceed;
|
||||
double failedMapperFraction; // !!!!!
|
||||
|
||||
private Properties jobProperties = new Properties();
|
||||
private JobProperties jobProperties = new JobProperties();
|
||||
|
||||
LoggedJob() {
|
||||
|
||||
|
@ -110,13 +112,13 @@ public class LoggedJob implements DeepCompare {
|
|||
* Set the configuration properties of the job.
|
||||
*/
|
||||
void setJobProperties(Properties conf) {
|
||||
this.jobProperties = conf;
|
||||
this.jobProperties = new JobProperties(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configuration properties of the job.
|
||||
*/
|
||||
public Properties getJobProperties() {
|
||||
public JobProperties getJobProperties() {
|
||||
return jobProperties;
|
||||
}
|
||||
|
||||
|
@ -138,7 +140,6 @@ public class LoggedJob implements DeepCompare {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
// for input parameter ignored.
|
||||
@JsonAnySetter
|
||||
public void setUnknownAttribute(String attributeName, Object ignored) {
|
||||
|
@ -149,20 +150,20 @@ public class LoggedJob implements DeepCompare {
|
|||
}
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
public UserName getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
void setUser(String user) {
|
||||
this.user = user;
|
||||
this.user = new UserName(user);
|
||||
}
|
||||
|
||||
public String getJobID() {
|
||||
public JobID getJobID() {
|
||||
return jobID;
|
||||
}
|
||||
|
||||
void setJobID(String jobID) {
|
||||
this.jobID = jobID;
|
||||
this.jobID = JobID.forName(jobID);
|
||||
}
|
||||
|
||||
public JobPriority getPriority() {
|
||||
|
@ -359,20 +360,20 @@ public class LoggedJob implements DeepCompare {
|
|||
this.relativeTime = relativeTime;
|
||||
}
|
||||
|
||||
public String getQueue() {
|
||||
public QueueName getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
void setQueue(String queue) {
|
||||
this.queue = queue;
|
||||
this.queue = new QueueName(queue);
|
||||
}
|
||||
|
||||
public String getJobName() {
|
||||
public JobName getJobName() {
|
||||
return jobName;
|
||||
}
|
||||
|
||||
void setJobName(String jobName) {
|
||||
this.jobName = jobName;
|
||||
this.jobName = new JobName(jobName);
|
||||
}
|
||||
|
||||
public int getClusterMapMB() {
|
||||
|
@ -555,33 +556,52 @@ public class LoggedJob implements DeepCompare {
|
|||
}
|
||||
}
|
||||
|
||||
private void compareJobProperties(Properties prop1, Properties prop2,
|
||||
private void compareJobProperties(JobProperties jprop1, JobProperties jprop2,
|
||||
TreePath loc, String eltname)
|
||||
throws DeepInequalityException {
|
||||
if (prop1 == null && prop2 == null) {
|
||||
if (jprop1 == null && jprop2 == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (prop1 == null || prop2 == null) {
|
||||
throw new DeepInequalityException(eltname + " miscompared [null]",
|
||||
if (jprop1 == null || jprop2 == null) {
|
||||
throw new DeepInequalityException(eltname + " miscompared",
|
||||
new TreePath(loc, eltname));
|
||||
}
|
||||
|
||||
Properties prop1 = jprop1.getValue();
|
||||
Properties prop2 = jprop2.getValue();
|
||||
|
||||
if (prop1.size() != prop2.size()) {
|
||||
throw new DeepInequalityException(eltname + " miscompared [size]",
|
||||
new TreePath(loc, eltname));
|
||||
}
|
||||
|
||||
for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
|
||||
Object v1 = entry.getValue();
|
||||
Object v2 = prop2.get(entry.getKey());
|
||||
if (v1 == null || v2 == null || !v1.equals(v2)) {
|
||||
throw new DeepInequalityException(
|
||||
eltname + " miscompared for value of key : "
|
||||
+ entry.getKey().toString(),
|
||||
String v1 = entry.getValue().toString();
|
||||
String v2 = prop2.get(entry.getKey()).toString();
|
||||
compare1(v1, v2, new TreePath(loc, eltname), "key:" + entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
private void compare1(DataType<String> c1, DataType<String> c2, TreePath loc,
|
||||
String eltname)
|
||||
throws DeepInequalityException {
|
||||
if (c1 == null && c2 == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (c1 == null || c2 == null) {
|
||||
throw new DeepInequalityException(eltname + " miscompared",
|
||||
new TreePath(loc, eltname));
|
||||
}
|
||||
TreePath dtPath = new TreePath(loc, eltname);
|
||||
|
||||
if (!c1.getClass().getName().equals(c2.getClass().getName())) {
|
||||
throw new DeepInequalityException(eltname + " miscompared",
|
||||
new TreePath(dtPath, "class"));
|
||||
}
|
||||
|
||||
compare1(c1.getValue(), c2.getValue(), dtPath, "value");
|
||||
}
|
||||
|
||||
public void deepCompare(DeepCompare comparand, TreePath loc)
|
||||
|
@ -592,7 +612,7 @@ public class LoggedJob implements DeepCompare {
|
|||
|
||||
LoggedJob other = (LoggedJob) comparand;
|
||||
|
||||
compare1(jobID, other.jobID, loc, "jobID");
|
||||
compare1(jobID.toString(), other.jobID.toString(), loc, "jobID");
|
||||
compare1(user, other.user, loc, "user");
|
||||
|
||||
compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
|
||||
import org.codehaus.jackson.annotate.JsonAnySetter;
|
||||
|
||||
/**
|
||||
|
@ -44,20 +45,20 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
|
|||
*
|
||||
*/
|
||||
public class LoggedLocation implements DeepCompare {
|
||||
static final Map<List<String>, List<String>> layersCache =
|
||||
new HashMap<List<String>, List<String>>();
|
||||
static final Map<List<String>, List<NodeName>> layersCache =
|
||||
new HashMap<List<String>, List<NodeName>>();
|
||||
|
||||
/**
|
||||
* The full path from the root of the network to the host.
|
||||
*
|
||||
* NOTE that this assumes that the network topology is a tree.
|
||||
*/
|
||||
List<String> layers = Collections.emptyList();
|
||||
List<NodeName> layers = Collections.emptyList();
|
||||
|
||||
static private Set<String> alreadySeenAnySetterAttributes =
|
||||
new TreeSet<String>();
|
||||
|
||||
public List<String> getLayers() {
|
||||
public List<NodeName> getLayers() {
|
||||
return layers;
|
||||
}
|
||||
|
||||
|
@ -66,16 +67,17 @@ public class LoggedLocation implements DeepCompare {
|
|||
this.layers = Collections.emptyList();
|
||||
} else {
|
||||
synchronized (layersCache) {
|
||||
List<String> found = layersCache.get(layers);
|
||||
List<NodeName> found = layersCache.get(layers);
|
||||
if (found == null) {
|
||||
// make a copy with interned string.
|
||||
List<String> clone = new ArrayList<String>(layers.size());
|
||||
for (String s : layers) {
|
||||
clone.add(s.intern());
|
||||
}
|
||||
List<NodeName> clone = new ArrayList<NodeName>(layers.size());
|
||||
clone.add(new NodeName(layers.get(0).intern(), null));
|
||||
clone.add(new NodeName(null, layers.get(1).intern()));
|
||||
|
||||
// making it read-only as we are sharing them.
|
||||
List<String> readonlyLayers = Collections.unmodifiableList(clone);
|
||||
layersCache.put(readonlyLayers, readonlyLayers);
|
||||
List<NodeName> readonlyLayers = Collections.unmodifiableList(clone);
|
||||
List<String> readonlyLayersKey = Collections.unmodifiableList(layers);
|
||||
layersCache.put(readonlyLayersKey, readonlyLayers);
|
||||
this.layers = readonlyLayers;
|
||||
} else {
|
||||
this.layers = found;
|
||||
|
@ -84,7 +86,6 @@ public class LoggedLocation implements DeepCompare {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
// for input parameter ignored.
|
||||
@JsonAnySetter
|
||||
public void setUnknownAttribute(String attributeName, Object ignored) {
|
||||
|
@ -96,17 +97,33 @@ public class LoggedLocation implements DeepCompare {
|
|||
}
|
||||
|
||||
// I'll treat this as an atomic object type
|
||||
private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
|
||||
String eltname) throws DeepInequalityException {
|
||||
private void compareStrings(List<NodeName> c1, List<NodeName> c2,
|
||||
TreePath loc, String eltname)
|
||||
throws DeepInequalityException {
|
||||
if (c1 == null && c2 == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
TreePath recursePath = new TreePath(loc, eltname);
|
||||
|
||||
if (c1 == null || c2 == null || !c1.equals(c2)) {
|
||||
if (c1 == null || c2 == null || (c1.size() != c2.size())) {
|
||||
throw new DeepInequalityException(eltname + " miscompared", recursePath);
|
||||
}
|
||||
|
||||
for (NodeName n1 : c1) {
|
||||
boolean found = false;
|
||||
for (NodeName n2 : c2) {
|
||||
if (n1.getValue().equals(n2.getValue())) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
throw new DeepInequalityException(eltname
|
||||
+ " miscompared [" + n1.getValue() +"]", recursePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void deepCompare(DeepCompare comparand, TreePath loc)
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.TreeSet;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
|
||||
import org.codehaus.jackson.annotate.JsonAnySetter;
|
||||
|
||||
/**
|
||||
|
@ -40,7 +41,7 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
|
|||
*
|
||||
*/
|
||||
public class LoggedNetworkTopology implements DeepCompare {
|
||||
String name;
|
||||
NodeName name;
|
||||
List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
|
||||
|
||||
static private Set<String> alreadySeenAnySetterAttributes =
|
||||
|
@ -50,7 +51,6 @@ public class LoggedNetworkTopology implements DeepCompare {
|
|||
super();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
// for input parameter ignored.
|
||||
@JsonAnySetter
|
||||
public void setUnknownAttribute(String attributeName, Object ignored) {
|
||||
|
@ -70,7 +70,7 @@ public class LoggedNetworkTopology implements DeepCompare {
|
|||
*/
|
||||
static class TopoSort implements Comparator<LoggedNetworkTopology> {
|
||||
public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
|
||||
return t1.name.compareTo(t2.name);
|
||||
return t1.name.getValue().compareTo(t2.name.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,8 +83,11 @@ public class LoggedNetworkTopology implements DeepCompare {
|
|||
* the level number
|
||||
*/
|
||||
LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
|
||||
|
||||
this.name = name;
|
||||
if (name == null) {
|
||||
this.name = NodeName.ROOT;
|
||||
} else {
|
||||
this.name = new NodeName(name);
|
||||
}
|
||||
this.children = null;
|
||||
|
||||
if (level < ParsedHost.numberOfDistances() - 1) {
|
||||
|
@ -120,15 +123,15 @@ public class LoggedNetworkTopology implements DeepCompare {
|
|||
}
|
||||
|
||||
LoggedNetworkTopology(Set<ParsedHost> hosts) {
|
||||
this(hosts, "<root>", 0);
|
||||
this(hosts, null, 0);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
public NodeName getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
void setName(String name) {
|
||||
this.name = name;
|
||||
this.name = new NodeName(name);
|
||||
}
|
||||
|
||||
public List<LoggedNetworkTopology> getChildren() {
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.mapreduce.jobhistory.Events;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
||||
|
@ -44,7 +44,7 @@ public class LoggedTask implements DeepCompare {
|
|||
long inputRecords = -1L;
|
||||
long outputBytes = -1L;
|
||||
long outputRecords = -1L;
|
||||
String taskID;
|
||||
TaskID taskID;
|
||||
long startTime = -1L;
|
||||
long finishTime = -1L;
|
||||
Pre21JobHistoryConstants.Values taskType;
|
||||
|
@ -55,7 +55,6 @@ public class LoggedTask implements DeepCompare {
|
|||
static private Set<String> alreadySeenAnySetterAttributes =
|
||||
new TreeSet<String>();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
// for input parameter ignored.
|
||||
@JsonAnySetter
|
||||
public void setUnknownAttribute(String attributeName, Object ignored) {
|
||||
|
@ -111,12 +110,12 @@ public class LoggedTask implements DeepCompare {
|
|||
this.outputRecords = outputRecords;
|
||||
}
|
||||
|
||||
public String getTaskID() {
|
||||
public TaskID getTaskID() {
|
||||
return taskID;
|
||||
}
|
||||
|
||||
void setTaskID(String taskID) {
|
||||
this.taskID = taskID;
|
||||
this.taskID = TaskID.forName(taskID);
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
|
@ -357,7 +356,7 @@ public class LoggedTask implements DeepCompare {
|
|||
compare1(outputBytes, other.outputBytes, loc, "outputBytes");
|
||||
compare1(outputRecords, other.outputRecords, loc, "outputRecords");
|
||||
|
||||
compare1(taskID, other.taskID, loc, "taskID");
|
||||
compare1(taskID.toString(), other.taskID.toString(), loc, "taskID");
|
||||
|
||||
compare1(startTime, other.startTime, loc, "startTime");
|
||||
compare1(finishTime, other.finishTime, loc, "finishTime");
|
||||
|
|
|
@ -30,9 +30,11 @@ import org.codehaus.jackson.annotate.JsonAnySetter;
|
|||
// the Jackson implementation of JSON doesn't handle a
|
||||
// superclass-valued field.
|
||||
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
|
||||
|
||||
/**
|
||||
* A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
|
||||
|
@ -44,11 +46,11 @@ import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
|||
*/
|
||||
public class LoggedTaskAttempt implements DeepCompare {
|
||||
|
||||
String attemptID;
|
||||
TaskAttemptID attemptID;
|
||||
Pre21JobHistoryConstants.Values result;
|
||||
long startTime = -1L;
|
||||
long finishTime = -1L;
|
||||
String hostName;
|
||||
NodeName hostName;
|
||||
|
||||
long hdfsBytesRead = -1L;
|
||||
long hdfsBytesWritten = -1L;
|
||||
|
@ -188,7 +190,6 @@ public class LoggedTaskAttempt implements DeepCompare {
|
|||
static private Set<String> alreadySeenAnySetterAttributes =
|
||||
new TreeSet<String>();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
// for input parameter ignored.
|
||||
@JsonAnySetter
|
||||
public void setUnknownAttribute(String attributeName, Object ignored) {
|
||||
|
@ -292,12 +293,12 @@ public class LoggedTaskAttempt implements DeepCompare {
|
|||
this.sortFinished = sortFinished;
|
||||
}
|
||||
|
||||
public String getAttemptID() {
|
||||
public TaskAttemptID getAttemptID() {
|
||||
return attemptID;
|
||||
}
|
||||
|
||||
void setAttemptID(String attemptID) {
|
||||
this.attemptID = attemptID;
|
||||
this.attemptID = TaskAttemptID.forName(attemptID);
|
||||
}
|
||||
|
||||
public Pre21JobHistoryConstants.Values getResult() {
|
||||
|
@ -324,15 +325,17 @@ public class LoggedTaskAttempt implements DeepCompare {
|
|||
this.finishTime = finishTime;
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
public NodeName getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
// This is needed for JSON deserialization
|
||||
void setHostName(String hostName) {
|
||||
this.hostName = hostName;
|
||||
this.hostName = hostName == null ? null : new NodeName(hostName);
|
||||
}
|
||||
|
||||
// hostName is saved in the format rackName/NodeName
|
||||
// In job-history, hostName is saved in the format rackName/NodeName
|
||||
//TODO this is a hack! The '/' handling needs fixing.
|
||||
void setHostName(String hostName, String rackName) {
|
||||
if (hostName == null || hostName.length() == 0) {
|
||||
throw new RuntimeException("Invalid entry! Missing hostname");
|
||||
|
@ -649,6 +652,20 @@ public class LoggedTaskAttempt implements DeepCompare {
|
|||
}
|
||||
}
|
||||
|
||||
private void compare1(NodeName c1, NodeName c2, TreePath loc, String eltname)
|
||||
throws DeepInequalityException {
|
||||
if (c1 == null && c2 == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (c1 == null || c2 == null) {
|
||||
throw new DeepInequalityException(eltname + " miscompared", new TreePath(
|
||||
loc, eltname));
|
||||
}
|
||||
|
||||
compare1(c1.getValue(), c2.getValue(), new TreePath(loc, eltname), "value");
|
||||
}
|
||||
|
||||
private void compare1(long c1, long c2, TreePath loc, String eltname)
|
||||
throws DeepInequalityException {
|
||||
if (c1 != c2) {
|
||||
|
@ -709,7 +726,7 @@ public class LoggedTaskAttempt implements DeepCompare {
|
|||
|
||||
LoggedTaskAttempt other = (LoggedTaskAttempt) comparand;
|
||||
|
||||
compare1(attemptID, other.attemptID, loc, "attemptID");
|
||||
compare1(attemptID.toString(), other.attemptID.toString(), loc, "attemptID");
|
||||
compare1(result, other.result, loc, "result");
|
||||
compare1(startTime, other.startTime, loc, "startTime");
|
||||
compare1(finishTime, other.finishTime, loc, "finishTime");
|
||||
|
|
|
@ -22,7 +22,9 @@ import java.util.List;
|
|||
import java.util.regex.Pattern;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
class ParsedHost {
|
||||
import org.apache.hadoop.tools.rumen.datatypes.NodeName;
|
||||
|
||||
public class ParsedHost {
|
||||
private final String rackName;
|
||||
private final String nodeName;
|
||||
|
||||
|
@ -70,10 +72,10 @@ class ParsedHost {
|
|||
}
|
||||
|
||||
public ParsedHost(LoggedLocation loc) {
|
||||
List<String> coordinates = loc.getLayers();
|
||||
List<NodeName> coordinates = loc.getLayers();
|
||||
|
||||
rackName = coordinates.get(0);
|
||||
nodeName = coordinates.get(1);
|
||||
rackName = coordinates.get(0).getRackName();
|
||||
nodeName = coordinates.get(1).getHostName();
|
||||
}
|
||||
|
||||
LoggedLocation makeLoggedLocation() {
|
||||
|
@ -89,11 +91,11 @@ class ParsedHost {
|
|||
return result;
|
||||
}
|
||||
|
||||
String getNodeName() {
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
String getRackName() {
|
||||
public String getRackName() {
|
||||
return rackName;
|
||||
}
|
||||
|
||||
|
|
|
@ -124,15 +124,16 @@ public class ZombieCluster extends AbstractClusterStory {
|
|||
int level = levelMapping.get(n);
|
||||
Node current;
|
||||
if (level == leafLevel) { // a machine node
|
||||
MachineNode.Builder builder = new MachineNode.Builder(n.getName(), level);
|
||||
MachineNode.Builder builder =
|
||||
new MachineNode.Builder(n.getName().getValue(), level);
|
||||
if (defaultNode != null) {
|
||||
builder.cloneFrom(defaultNode);
|
||||
}
|
||||
current = builder.build();
|
||||
} else {
|
||||
current = (level == leafLevel - 1)
|
||||
? new RackNode(n.getName(), level) :
|
||||
new Node(n.getName(), level);
|
||||
? new RackNode(n.getName().getValue(), level) :
|
||||
new Node(n.getName().getValue(), level);
|
||||
path[level] = current;
|
||||
// Add all children to the front of the queue.
|
||||
for (LoggedNetworkTopology child : n.getChildren()) {
|
||||
|
|
|
@ -28,12 +28,14 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.TaskStatus.State;
|
||||
import org.apache.hadoop.mapreduce.ID;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.*;
|
||||
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
|
||||
|
||||
/**
|
||||
|
@ -128,7 +130,7 @@ public class ZombieJob implements JobStory {
|
|||
// file, are added first because the specialized values obtained from
|
||||
// Rumen should override the job conf values.
|
||||
//
|
||||
for (Map.Entry<Object, Object> entry : job.getJobProperties().entrySet()) {
|
||||
for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
|
||||
jobConf.set(entry.getKey().toString(), entry.getValue().toString());
|
||||
}
|
||||
|
||||
|
@ -161,12 +163,12 @@ public class ZombieJob implements JobStory {
|
|||
List<String> hostList = new ArrayList<String>();
|
||||
if (locations != null) {
|
||||
for (LoggedLocation location : locations) {
|
||||
List<String> layers = location.getLayers();
|
||||
List<NodeName> layers = location.getLayers();
|
||||
if (layers.size() == 0) {
|
||||
LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
|
||||
continue;
|
||||
}
|
||||
String host = layers.get(layers.size() - 1);
|
||||
String host = layers.get(layers.size() - 1).getValue();
|
||||
if (host == null) {
|
||||
LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
|
||||
continue;
|
||||
|
@ -226,20 +228,20 @@ public class ZombieJob implements JobStory {
|
|||
|
||||
@Override
|
||||
public String getName() {
|
||||
String jobName = job.getJobName();
|
||||
JobName jobName = job.getJobName();
|
||||
if (jobName == null) {
|
||||
return "(name unknown)";
|
||||
} else {
|
||||
return jobName;
|
||||
return jobName.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobID getJobID() {
|
||||
return JobID.forName(getLoggedJob().getJobID());
|
||||
return getLoggedJob().getJobID();
|
||||
}
|
||||
|
||||
private int sanitizeValue(int oldVal, int defaultVal, String name, String id) {
|
||||
private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
|
||||
if (oldVal == -1) {
|
||||
LOG.warn(name +" not defined for "+id);
|
||||
return defaultVal;
|
||||
|
@ -269,8 +271,10 @@ public class ZombieJob implements JobStory {
|
|||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
String queue = job.getQueue();
|
||||
return (queue == null)? JobConf.DEFAULT_QUEUE_NAME : queue;
|
||||
QueueName queue = job.getQueue();
|
||||
return (queue == null || queue.getValue() == null)
|
||||
? JobConf.DEFAULT_QUEUE_NAME
|
||||
: queue.getValue();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -357,13 +361,12 @@ public class ZombieJob implements JobStory {
|
|||
for (LoggedTask map : job.getMapTasks()) {
|
||||
map = sanitizeLoggedTask(map);
|
||||
if (map != null) {
|
||||
loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
|
||||
loggedTaskMap.put(maskTaskID(map.taskID), map);
|
||||
|
||||
for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
|
||||
mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
|
||||
if (mapAttempt != null) {
|
||||
TaskAttemptID id = TaskAttemptID.forName(mapAttempt
|
||||
.getAttemptID());
|
||||
TaskAttemptID id = mapAttempt.getAttemptID();
|
||||
loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
|
||||
}
|
||||
}
|
||||
|
@ -372,13 +375,12 @@ public class ZombieJob implements JobStory {
|
|||
for (LoggedTask reduce : job.getReduceTasks()) {
|
||||
reduce = sanitizeLoggedTask(reduce);
|
||||
if (reduce != null) {
|
||||
loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
|
||||
loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);
|
||||
|
||||
for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
|
||||
reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
|
||||
if (reduceAttempt != null) {
|
||||
TaskAttemptID id = TaskAttemptID.forName(reduceAttempt
|
||||
.getAttemptID());
|
||||
TaskAttemptID id = reduceAttempt.getAttemptID();
|
||||
loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
|
||||
}
|
||||
}
|
||||
|
@ -391,8 +393,10 @@ public class ZombieJob implements JobStory {
|
|||
|
||||
@Override
|
||||
public String getUser() {
|
||||
String retval = job.getUser();
|
||||
return (retval==null)?"(unknown)":retval;
|
||||
UserName retval = job.getUser();
|
||||
return (retval == null || retval.getValue() == null)
|
||||
? "(unknown)"
|
||||
: retval.getValue();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -511,7 +515,7 @@ public class ZombieJob implements JobStory {
|
|||
}
|
||||
}
|
||||
|
||||
private long sanitizeTaskRuntime(long time, String id) {
|
||||
private long sanitizeTaskRuntime(long time, ID id) {
|
||||
if (time < 0) {
|
||||
LOG.warn("Negative running time for task "+id+": "+time);
|
||||
return 100L; // set default to 100ms.
|
||||
|
@ -547,7 +551,7 @@ public class ZombieJob implements JobStory {
|
|||
|
||||
private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
|
||||
int distance = cluster.getMaximumDistance();
|
||||
String rackHostName = loggedAttempt.getHostName();
|
||||
String rackHostName = loggedAttempt.getHostName().getValue();
|
||||
if (rackHostName == null) {
|
||||
return distance;
|
||||
}
|
||||
|
@ -558,11 +562,11 @@ public class ZombieJob implements JobStory {
|
|||
List<LoggedLocation> locations = loggedTask.getPreferredLocations();
|
||||
if (locations != null) {
|
||||
for (LoggedLocation location : locations) {
|
||||
List<String> layers = location.getLayers();
|
||||
List<NodeName> layers = location.getLayers();
|
||||
if ((layers == null) || (layers.isEmpty())) {
|
||||
continue;
|
||||
}
|
||||
String dataNodeName = layers.get(layers.size()-1);
|
||||
String dataNodeName = layers.get(layers.size()-1).getValue();
|
||||
MachineNode dataNode = cluster.getMachineByName(dataNodeName);
|
||||
if (dataNode != null) {
|
||||
distance = Math.min(distance, cluster.distance(mn, dataNode));
|
||||
|
@ -690,8 +694,8 @@ public class ZombieJob implements JobStory {
|
|||
|
||||
private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
|
||||
int taskAttemptNumber) {
|
||||
return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
|
||||
taskType, taskNumber), taskAttemptNumber);
|
||||
return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber),
|
||||
taskAttemptNumber);
|
||||
}
|
||||
|
||||
private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
|
||||
|
@ -704,7 +708,7 @@ public class ZombieJob implements JobStory {
|
|||
state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
|
||||
runtime = makeUpMapRuntime(state, locality);
|
||||
runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
|
||||
taskNumber, taskAttemptNumber).toString());
|
||||
taskNumber, taskAttemptNumber));
|
||||
TaskAttemptInfo tai
|
||||
= new MapTaskAttemptInfo(state, taskInfo, runtime, null);
|
||||
return tai;
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.anonymization;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.state.State;
|
||||
|
||||
/**
|
||||
* The data anonymizer interface.
|
||||
*/
|
||||
public interface DataAnonymizer<T> {
|
||||
T anonymize(T data, State state);
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
package org.apache.hadoop.tools.rumen.anonymization;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.state.State;
|
||||
|
||||
/**
|
||||
* Represents the list of words used in list-backed anonymizers.
|
||||
*/
|
||||
public class WordList implements State {
|
||||
private Map<String, Integer> list = new HashMap<String, Integer>(0);
|
||||
private boolean isUpdated = false;
|
||||
private String name;
|
||||
|
||||
public WordList() {
|
||||
this("word");
|
||||
}
|
||||
|
||||
public WordList(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the specified word to the list if the word is not already added.
|
||||
*/
|
||||
public void add(String word) {
|
||||
if (!contains(word)) {
|
||||
int index = getSize();
|
||||
list.put(word, index);
|
||||
isUpdated = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns 'true' if the list contains the specified word.
|
||||
*/
|
||||
public boolean contains(String word) {
|
||||
return list.containsKey(word);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index of the specified word in the list.
|
||||
*/
|
||||
public int indexOf(String word) {
|
||||
return list.get(word);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of the list.
|
||||
*/
|
||||
public int getSize() {
|
||||
return list.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns 'true' if the list is updated since creation (and reload).
|
||||
*/
|
||||
@Override
|
||||
public boolean isUpdated() {
|
||||
return isUpdated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setters and getters for Jackson JSON
|
||||
*/
|
||||
/**
|
||||
* Sets the size of the list.
|
||||
*
|
||||
* Note: That this API is only for Jackson JSON deserialization.
|
||||
*/
|
||||
public void setSize(int size) {
|
||||
list = new HashMap<String, Integer>(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: That this API is only for Jackson JSON deserialization.
|
||||
*/
|
||||
@Override
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the words.
|
||||
*
|
||||
* Note: That this API is only for Jackson JSON serialization.
|
||||
*/
|
||||
public Map<String, Integer> getWords() {
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the words.
|
||||
*
|
||||
* Note: That this API is only for Jackson JSON deserialization.
|
||||
*/
|
||||
public void setWords(Map<String, Integer> list) {
|
||||
this.list = list;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.anonymization;
|
||||
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
/**
|
||||
* Utility class to handle commonly performed tasks in a
|
||||
* {@link org.apache.hadoop.tools.rumen.datatypes.DefaultAnonymizableDataType}
|
||||
* using a {@link WordList} for anonymization.
|
||||
* //TODO There is no caching for saving memory.
|
||||
*/
|
||||
public class WordListAnonymizerUtility {
|
||||
public static final String[] KNOWN_WORDS =
|
||||
new String[] {"job", "tmp", "temp", "home", "homes", "usr", "user", "test"};
|
||||
|
||||
/**
|
||||
* Checks if the data needs anonymization. Typically, data types which are
|
||||
* numeric in nature doesn't need anonymization.
|
||||
*/
|
||||
public static boolean needsAnonymization(String data) {
|
||||
// Numeric data doesn't need anonymization
|
||||
// Currently this doesnt support inputs like
|
||||
// - 12.3
|
||||
// - 12.3f
|
||||
// - 90L
|
||||
// - 1D
|
||||
if (StringUtils.isNumeric(data)) {
|
||||
return false;
|
||||
}
|
||||
return true; // by default return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given data has a known suffix.
|
||||
*/
|
||||
public static boolean hasSuffix(String data, String[] suffixes) {
|
||||
// check if they end in known suffixes
|
||||
for (String ks : suffixes) {
|
||||
if (data.endsWith(ks)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts a known suffix from the given data.
|
||||
*
|
||||
* @throws RuntimeException if the data doesn't have a suffix.
|
||||
* Use {@link #hasSuffix(String, String[])} to make sure that the
|
||||
* given data has a suffix.
|
||||
*/
|
||||
public static String[] extractSuffix(String data, String[] suffixes) {
|
||||
// check if they end in known suffixes
|
||||
String suffix = "";
|
||||
for (String ks : suffixes) {
|
||||
if (data.endsWith(ks)) {
|
||||
suffix = ks;
|
||||
// stripe off the suffix which will get appended later
|
||||
data = data.substring(0, data.length() - suffix.length());
|
||||
return new String[] {data, suffix};
|
||||
}
|
||||
}
|
||||
|
||||
// throw exception
|
||||
throw new RuntimeException("Data [" + data + "] doesn't have a suffix from"
|
||||
+ " known suffixes [" + StringUtils.join(suffixes, ',') + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given data is known. This API uses {@link #KNOWN_WORDS} to
|
||||
* detect if the given data is a commonly used (so called 'known') word.
|
||||
*/
|
||||
public static boolean isKnownData(String data) {
|
||||
return isKnownData(data, KNOWN_WORDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given data is known.
|
||||
*/
|
||||
public static boolean isKnownData(String data, String[] knownWords) {
|
||||
// check if the data is known content
|
||||
//TODO [Chunking] Do this for sub-strings of data
|
||||
|
||||
for (String kd : knownWords) {
|
||||
if (data.equals(kd)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
|
||||
/**
|
||||
* An interface for data-types that can be anonymized.
|
||||
*/
|
||||
public interface AnonymizableDataType<T> extends DataType<T> {
|
||||
public T getAnonymizedValue(StatePool statePool, Configuration conf);
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Represents a class name.
|
||||
*/
|
||||
public class ClassName extends DefaultAnonymizableDataType {
|
||||
public static final String CLASSNAME_PRESERVE_CONFIG = "rumen.data-types.classname.preserve";
|
||||
private final String className;
|
||||
|
||||
public ClassName(String className) {
|
||||
super();
|
||||
this.className = className;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return className;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPrefix() {
|
||||
return "class";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsAnonymization(Configuration conf) {
|
||||
String[] preserves = conf.getStrings(CLASSNAME_PRESERVE_CONFIG);
|
||||
if (preserves != null) {
|
||||
// do a simple starts with check
|
||||
for (String p : preserves) {
|
||||
if (className.startsWith(p)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
/**
|
||||
* Represents a Rumen data-type.
|
||||
*/
|
||||
public interface DataType<T> {
|
||||
T getValue();
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.rumen.anonymization.WordList;
|
||||
import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
|
||||
/**
|
||||
* Represents a default anonymizable Rumen data-type. It uses
|
||||
* {@link WordListAnonymizerUtility} for anonymization.
|
||||
*/
|
||||
public abstract class DefaultAnonymizableDataType
|
||||
implements AnonymizableDataType<String> {
|
||||
private static final String DEFAULT_PREFIX = "data";
|
||||
|
||||
protected String getPrefix() {
|
||||
return DEFAULT_PREFIX;
|
||||
}
|
||||
|
||||
// Determines if the contained data needs anonymization
|
||||
protected boolean needsAnonymization(Configuration conf) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getAnonymizedValue(StatePool statePool,
|
||||
Configuration conf) {
|
||||
if (needsAnonymization(conf)) {
|
||||
WordList state = (WordList) statePool.getState(getClass());
|
||||
if (state == null) {
|
||||
state = new WordList(getPrefix());
|
||||
statePool.addState(getClass(), state);
|
||||
}
|
||||
return anonymize(getValue(), state);
|
||||
} else {
|
||||
return getValue();
|
||||
}
|
||||
}
|
||||
|
||||
private static String anonymize(String data, WordList wordList) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!wordList.contains(data)) {
|
||||
wordList.add(data);
|
||||
}
|
||||
return wordList.getName() + wordList.indexOf(data);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
/**
|
||||
* This represents the default java data-types (like int, long, float etc).
|
||||
*/
|
||||
public class DefaultDataType implements DataType<String> {
|
||||
private String value;
|
||||
|
||||
public DefaultDataType(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value of the attribute.
|
||||
*/
|
||||
@Override
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,213 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.tools.rumen.anonymization.WordList;
|
||||
import org.apache.hadoop.tools.rumen.anonymization.WordListAnonymizerUtility;
|
||||
import org.apache.hadoop.tools.rumen.state.State;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Represents a file's location.
|
||||
*
|
||||
* Currently, only filenames that can be represented using {@link Path} are
|
||||
* supported.
|
||||
*/
|
||||
public class FileName implements AnonymizableDataType<String> {
|
||||
private final String fileName;
|
||||
private String anonymizedFileName;
|
||||
private static final String PREV_DIR = "..";
|
||||
private static final String[] KNOWN_SUFFIXES =
|
||||
new String[] {".xml", ".jar", ".txt", ".tar", ".zip", ".json", ".gzip",
|
||||
".lzo"};
|
||||
|
||||
/**
|
||||
* A composite state for filename.
|
||||
*/
|
||||
public static class FileNameState implements State {
|
||||
private WordList dirState = new WordList("dir");
|
||||
private WordList fileNameState = new WordList("file");
|
||||
|
||||
@Override
|
||||
public boolean isUpdated() {
|
||||
return dirState.isUpdated() || fileNameState.isUpdated();
|
||||
}
|
||||
|
||||
public WordList getDirectoryState() {
|
||||
return dirState;
|
||||
}
|
||||
|
||||
public WordList getFileNameState() {
|
||||
return fileNameState;
|
||||
}
|
||||
|
||||
public void setDirectoryState(WordList state) {
|
||||
this.dirState = state;
|
||||
}
|
||||
|
||||
public void setFileNameState(WordList state) {
|
||||
this.fileNameState = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "path";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setName(String name) {
|
||||
// for now, simply assert since this class has a hardcoded name
|
||||
if (!getName().equals(name)) {
|
||||
throw new RuntimeException("State name mismatch! Expected '"
|
||||
+ getName() + "' but found '" + name + "'.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public FileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAnonymizedValue(StatePool statePool,
|
||||
Configuration conf) {
|
||||
if (anonymizedFileName == null) {
|
||||
anonymize(statePool, conf);
|
||||
}
|
||||
return anonymizedFileName;
|
||||
}
|
||||
|
||||
private void anonymize(StatePool statePool, Configuration conf) {
|
||||
FileNameState fState = (FileNameState) statePool.getState(getClass());
|
||||
if (fState == null) {
|
||||
fState = new FileNameState();
|
||||
statePool.addState(getClass(), fState);
|
||||
}
|
||||
|
||||
String[] files = StringUtils.split(fileName);
|
||||
String[] anonymizedFileNames = new String[files.length];
|
||||
int i = 0;
|
||||
for (String f : files) {
|
||||
anonymizedFileNames[i++] =
|
||||
anonymize(statePool, conf, fState, f);
|
||||
}
|
||||
|
||||
anonymizedFileName = StringUtils.arrayToString(anonymizedFileNames);
|
||||
}
|
||||
|
||||
private static String anonymize(StatePool statePool, Configuration conf,
|
||||
FileNameState fState, String fileName) {
|
||||
String ret = null;
|
||||
try {
|
||||
URI uri = new URI(fileName);
|
||||
|
||||
// anonymize the path i.e without the authority & scheme
|
||||
ret =
|
||||
anonymizePath(uri.getPath(), fState.getDirectoryState(),
|
||||
fState.getFileNameState());
|
||||
|
||||
// anonymize the authority and scheme
|
||||
String authority = uri.getAuthority();
|
||||
String scheme = uri.getScheme();
|
||||
if (scheme != null) {
|
||||
String anonymizedAuthority = "";
|
||||
if (authority != null) {
|
||||
// anonymize the authority
|
||||
NodeName hostName = new NodeName(null, uri.getHost());
|
||||
anonymizedAuthority = hostName.getAnonymizedValue(statePool, conf);
|
||||
}
|
||||
ret = scheme + "://" + anonymizedAuthority + ret;
|
||||
}
|
||||
} catch (URISyntaxException use) {
|
||||
throw new RuntimeException (use);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Anonymize the file-path
|
||||
private static String anonymizePath(String path, WordList dState,
|
||||
WordList fState) {
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
StringTokenizer tokenizer = new StringTokenizer(path, Path.SEPARATOR, true);
|
||||
while (tokenizer.hasMoreTokens()) {
|
||||
String token = tokenizer.nextToken();
|
||||
if (Path.SEPARATOR.equals(token)) {
|
||||
buffer.append(token);
|
||||
} else if (Path.CUR_DIR.equals(token)) {
|
||||
buffer.append(token);
|
||||
} else if (PREV_DIR.equals(token)) {
|
||||
buffer.append(token);
|
||||
} else if (tokenizer.hasMoreTokens()){
|
||||
// this component is a directory
|
||||
buffer.append(anonymize(token, dState));
|
||||
} else {
|
||||
// this component is a file
|
||||
buffer.append(anonymize(token, fState));
|
||||
}
|
||||
}
|
||||
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
//TODO There is no caching for saving memory.
|
||||
private static String anonymize(String data, WordList wordList) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (WordListAnonymizerUtility.needsAnonymization(data)) {
|
||||
String suffix = "";
|
||||
String coreData = data;
|
||||
// check and extract suffix
|
||||
if (WordListAnonymizerUtility.hasSuffix(data, KNOWN_SUFFIXES)) {
|
||||
// check if the data ends with a known suffix
|
||||
String[] split =
|
||||
WordListAnonymizerUtility.extractSuffix(data, KNOWN_SUFFIXES);
|
||||
suffix = split[1];
|
||||
coreData = split[0];
|
||||
}
|
||||
|
||||
// check if the data is known content
|
||||
//TODO [Chunking] Do this for sub-strings of data
|
||||
String anonymizedData = coreData;
|
||||
if (!WordListAnonymizerUtility.isKnownData(coreData)) {
|
||||
if (!wordList.contains(coreData)) {
|
||||
wordList.add(coreData);
|
||||
}
|
||||
anonymizedData = wordList.getName() + wordList.indexOf(coreData);
|
||||
}
|
||||
|
||||
return anonymizedData + suffix;
|
||||
} else {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
|
||||
/**
|
||||
* Represents a job's name.
|
||||
*/
|
||||
public class JobName extends DefaultAnonymizableDataType {
|
||||
private final String jobName;
|
||||
|
||||
public JobName(String jobName) {
|
||||
super();
|
||||
this.jobName = jobName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return jobName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPrefix() {
|
||||
return "job";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.util.JobPropertyParser;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.util.MapReduceJobPropertiesParser;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* This represents the job configuration properties.
|
||||
*/
|
||||
public class JobProperties implements AnonymizableDataType<Properties> {
|
||||
public static final String PARSERS_CONFIG_KEY =
|
||||
"rumen.datatypes.jobproperties.parsers";
|
||||
private final Properties jobProperties;
|
||||
|
||||
public JobProperties() {
|
||||
this(new Properties());
|
||||
}
|
||||
|
||||
public JobProperties(Properties properties) {
|
||||
this.jobProperties = properties;
|
||||
}
|
||||
|
||||
public Properties getValue() {
|
||||
return jobProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Properties getAnonymizedValue(StatePool statePool,
|
||||
Configuration conf) {
|
||||
Properties filteredProperties = null;
|
||||
List<JobPropertyParser> pList = new ArrayList<JobPropertyParser>(1);
|
||||
// load the parsers
|
||||
String config = conf.get(PARSERS_CONFIG_KEY);
|
||||
if (config != null) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<JobPropertyParser>[] parsers =
|
||||
(Class[])conf.getClasses(PARSERS_CONFIG_KEY);
|
||||
for (Class<JobPropertyParser> c : parsers) {
|
||||
JobPropertyParser parser = ReflectionUtils.newInstance(c, conf);
|
||||
pList.add(parser);
|
||||
}
|
||||
} else {
|
||||
// add the default MapReduce filter
|
||||
JobPropertyParser parser = new MapReduceJobPropertiesParser();
|
||||
pList.add(parser);
|
||||
}
|
||||
|
||||
// filter out the desired config key-value pairs
|
||||
if (jobProperties != null) {
|
||||
filteredProperties = new Properties();
|
||||
// define a configuration object and load it with original job properties
|
||||
for (Map.Entry<Object, Object> entry : jobProperties.entrySet()) {
|
||||
//TODO Check for null key/value?
|
||||
String key = entry.getKey().toString();
|
||||
String value = entry.getValue().toString();
|
||||
|
||||
// find a parser for this key
|
||||
for (JobPropertyParser p : pList) {
|
||||
DataType<?> pValue = p.parseJobProperty(key, value);
|
||||
if (pValue != null) {
|
||||
filteredProperties.put(key, pValue);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return filteredProperties;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.rumen.ParsedHost;
|
||||
import org.apache.hadoop.tools.rumen.anonymization.WordList;
|
||||
import org.apache.hadoop.tools.rumen.state.State;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Represents the cluster host.
|
||||
*/
|
||||
public class NodeName implements AnonymizableDataType<String> {
|
||||
private String hostName;
|
||||
private String rackName;
|
||||
private String nodeName;
|
||||
private String anonymizedNodeName;
|
||||
|
||||
public static final NodeName ROOT = new NodeName("<root>");
|
||||
|
||||
/**
|
||||
* A composite state for node-name.
|
||||
*/
|
||||
public static class NodeNameState implements State {
|
||||
private WordList rackNameState = new WordList("rack");
|
||||
private WordList hostNameState = new WordList("host");
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public boolean isUpdated() {
|
||||
return rackNameState.isUpdated() || hostNameState.isUpdated();
|
||||
}
|
||||
|
||||
public WordList getRackNameState() {
|
||||
return rackNameState;
|
||||
}
|
||||
|
||||
public WordList getHostNameState() {
|
||||
return hostNameState;
|
||||
}
|
||||
|
||||
public void setRackNameState(WordList state) {
|
||||
this.rackNameState = state;
|
||||
}
|
||||
|
||||
public void setHostNameState(WordList state) {
|
||||
this.hostNameState = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "node";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setName(String name) {
|
||||
// for now, simply assert since this class has a hardcoded name
|
||||
if (!getName().equals(name)) {
|
||||
throw new RuntimeException("State name mismatch! Expected '"
|
||||
+ getName() + "' but found '" + name + "'.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public NodeName(String nodeName) {
|
||||
this.nodeName = nodeName;
|
||||
ParsedHost pHost = ParsedHost.parse(nodeName);
|
||||
if (pHost == null) {
|
||||
this.rackName = null;
|
||||
this.hostName = nodeName;
|
||||
} else {
|
||||
//TODO check for null and improve .. possibly call NodeName(r,h)
|
||||
this.rackName = pHost.getRackName();
|
||||
this.hostName = pHost.getNodeName();
|
||||
}
|
||||
}
|
||||
|
||||
public NodeName(String rName, String hName) {
|
||||
rName = (rName == null)
|
||||
? rName
|
||||
: rName.length() == 0
|
||||
? null
|
||||
: rName;
|
||||
hName = (hName == null)
|
||||
? hName
|
||||
: hName.length() == 0
|
||||
? null
|
||||
: hName;
|
||||
if (hName == null) {
|
||||
nodeName = rName;
|
||||
rackName = rName;
|
||||
} else if (rName == null) {
|
||||
nodeName = hName;
|
||||
ParsedHost pHost = ParsedHost.parse(nodeName);
|
||||
if (pHost == null) {
|
||||
this.rackName = null;
|
||||
this.hostName = hName;
|
||||
} else {
|
||||
this.rackName = pHost.getRackName();
|
||||
this.hostName = pHost.getNodeName();
|
||||
}
|
||||
} else {
|
||||
rackName = rName;
|
||||
this.hostName = hName;
|
||||
this.nodeName = "/" + rName + "/" + hName;
|
||||
}
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public String getRackName() {
|
||||
return rackName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAnonymizedValue(StatePool statePool, Configuration conf) {
|
||||
if (this.getValue().equals(ROOT.getValue())) {
|
||||
return getValue();
|
||||
}
|
||||
if (anonymizedNodeName == null) {
|
||||
anonymize(statePool);
|
||||
}
|
||||
return anonymizedNodeName;
|
||||
}
|
||||
|
||||
private void anonymize(StatePool pool) {
|
||||
StringBuffer buf = new StringBuffer();
|
||||
NodeNameState state = (NodeNameState) pool.getState(getClass());
|
||||
if (state == null) {
|
||||
state = new NodeNameState();
|
||||
pool.addState(getClass(), state);
|
||||
}
|
||||
|
||||
if (rackName != null && hostName != null) {
|
||||
buf.append('/');
|
||||
buf.append(anonymize(rackName, state.getRackNameState()));
|
||||
buf.append('/');
|
||||
buf.append(anonymize(hostName, state.getHostNameState()));
|
||||
} else {
|
||||
if (state.getRackNameState().contains(nodeName) || rackName != null) {
|
||||
buf.append(anonymize(nodeName, state.getRackNameState()));
|
||||
} else {
|
||||
buf.append(anonymize(nodeName, state.getHostNameState()));
|
||||
}
|
||||
}
|
||||
|
||||
anonymizedNodeName = buf.toString();
|
||||
}
|
||||
|
||||
//TODO There is no caching for saving memory.
|
||||
private static String anonymize(String data, WordList wordList) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!wordList.contains(data)) {
|
||||
wordList.add(data);
|
||||
}
|
||||
return wordList.getName() + wordList.indexOf(data);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
|
||||
/**
|
||||
* Represents a queue name.
|
||||
*/
|
||||
public class QueueName extends DefaultAnonymizableDataType {
|
||||
private final String queueName;
|
||||
|
||||
public QueueName(String queueName) {
|
||||
super();
|
||||
this.queueName = queueName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return queueName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPrefix() {
|
||||
return "queue";
|
||||
};
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes;
|
||||
|
||||
/**
|
||||
* Represents a user's name.
|
||||
*/
|
||||
public class UserName extends DefaultAnonymizableDataType {
|
||||
private final String userName;
|
||||
|
||||
public UserName(String userName) {
|
||||
super();
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPrefix() {
|
||||
return "user";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes.util;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DataType;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DefaultDataType;
|
||||
|
||||
/**
|
||||
* A simple job property parser that acts like a pass-through filter.
|
||||
*/
|
||||
public class DefaultJobPropertiesParser implements JobPropertyParser {
|
||||
@Override
|
||||
public DataType<?> parseJobProperty(String key, String value) {
|
||||
return new DefaultDataType(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes.util;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DataType;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
|
||||
|
||||
/**
|
||||
* A {@link JobProperties} parsing utility.
|
||||
*/
|
||||
public interface JobPropertyParser {
|
||||
/**
|
||||
* Parse the specified job configuration key-value pair.
|
||||
*
|
||||
* @return Returns a {@link DataType} if this parser can parse this value.
|
||||
* Returns 'null' otherwise.
|
||||
*/
|
||||
public DataType<?> parseJobProperty(String key, String value);
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.datatypes.util;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.*;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* A default parser for MapReduce job configuration properties.
|
||||
* MapReduce job configuration properties are represented as key-value pairs.
|
||||
* Each key represents a configuration knob which controls or affects the
|
||||
* behavior of a MapReduce job or a job's task. The value associated with the
|
||||
* configuration key represents its value. Some of the keys are deprecated. As a
|
||||
* result of deprecation some keys change or are preferred over other keys,
|
||||
* across versions. {@link MapReduceJobPropertiesParser} is a utility class that
|
||||
* parses MapReduce job configuration properties and converts the value into a
|
||||
* well defined {@link DataType}. Users can use the
|
||||
* {@link MapReduceJobPropertiesParser#parseJobProperty()} API to process job
|
||||
* configuration parameters. This API will parse a job property represented as a
|
||||
* key-value pair and return the value wrapped inside a {@link DataType}.
|
||||
* Callers can then use the returned {@link DataType} for further processing.
|
||||
*
|
||||
* {@link MapReduceJobPropertiesParser} thrives on the key name to decide which
|
||||
* {@link DataType} to wrap the value with. Values for keys representing
|
||||
* job-name, queue-name, user-name etc are wrapped inside {@link JobName},
|
||||
* {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir*
|
||||
* are considered as a directory and hence gets be wrapped inside
|
||||
* {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are
|
||||
* also handled accordingly. Values representing basic java data-types like
|
||||
* integer, float, double, boolean etc are wrapped inside
|
||||
* {@link DefaultDataType}. If the key represents some jvm-level settings then
|
||||
* only standard settings are extracted and gets wrapped inside
|
||||
* {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are
|
||||
* considered while the rest are ignored.
|
||||
*
|
||||
* Note that the {@link MapReduceJobPropertiesParser#parseJobProperty()} API
|
||||
* maps the keys to a configuration parameter listed in
|
||||
* {@link MRJobConfig}. This not only filters non-framework specific keys thus
|
||||
* ignoring user-specific and hard-to-parse keys but also provides a consistent
|
||||
* view for all possible inputs. So if users invoke the
|
||||
* {@link MapReduceJobPropertiesParser#parseJobProperty()} API with either
|
||||
* <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">, then the result
|
||||
* would be a {@link UserName} {@link DataType} wrapping the user-name "bob".
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public class MapReduceJobPropertiesParser implements JobPropertyParser {
|
||||
private Field[] mrFields = MRJobConfig.class.getFields();
|
||||
private DecimalFormat format = new DecimalFormat();
|
||||
private JobConf configuration = new JobConf(false);
|
||||
private static final Pattern MAX_HEAP_PATTERN =
|
||||
Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
|
||||
private static final Pattern MIN_HEAP_PATTERN =
|
||||
Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+");
|
||||
|
||||
// turn off the warning w.r.t deprecated mapreduce keys
|
||||
static {
|
||||
Logger.getLogger(Configuration.class).setLevel(Level.OFF);
|
||||
}
|
||||
|
||||
// Accepts a key if there is a corresponding key in the current mapreduce
|
||||
// configuration
|
||||
private boolean accept(String key) {
|
||||
return getLatestKeyName(key) != null;
|
||||
}
|
||||
|
||||
// Finds a corresponding key for the specified key in the current mapreduce
|
||||
// setup.
|
||||
// Note that this API uses a cached copy of the Configuration object. This is
|
||||
// purely for performance reasons.
|
||||
private String getLatestKeyName(String key) {
|
||||
// set the specified key
|
||||
configuration.set(key, key);
|
||||
try {
|
||||
// check if keys in MRConfig maps to the specified key.
|
||||
for (Field f : mrFields) {
|
||||
String mrKey = f.get(f.getName()).toString();
|
||||
if (configuration.get(mrKey) != null) {
|
||||
return mrKey;
|
||||
}
|
||||
}
|
||||
|
||||
// unset the key
|
||||
return null;
|
||||
} catch (IllegalAccessException iae) {
|
||||
throw new RuntimeException(iae);
|
||||
} finally {
|
||||
// clean up!
|
||||
configuration.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType<?> parseJobProperty(String key, String value) {
|
||||
if (accept(key)) {
|
||||
return fromString(key, value);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the -Xmx heap option from the specified string.
|
||||
*/
|
||||
public static void extractMaxHeapOpts(String javaOptions,
|
||||
List<String> heapOpts,
|
||||
List<String> others) {
|
||||
for (String opt : javaOptions.split(" ")) {
|
||||
Matcher matcher = MAX_HEAP_PATTERN.matcher(opt);
|
||||
if (matcher.find()) {
|
||||
heapOpts.add(opt);
|
||||
} else {
|
||||
others.add(opt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the -Xms heap option from the specified string.
|
||||
*/
|
||||
public static void extractMinHeapOpts(String javaOptions,
|
||||
List<String> heapOpts, List<String> others) {
|
||||
for (String opt : javaOptions.split(" ")) {
|
||||
Matcher matcher = MIN_HEAP_PATTERN.matcher(opt);
|
||||
if (matcher.find()) {
|
||||
heapOpts.add(opt);
|
||||
} else {
|
||||
others.add(opt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Maps the value of the specified key.
|
||||
private DataType<?> fromString(String key, String value) {
|
||||
if (value != null) {
|
||||
// check known configs
|
||||
// job-name
|
||||
String latestKey = getLatestKeyName(key);
|
||||
|
||||
if (MRJobConfig.JOB_NAME.equals(latestKey)) {
|
||||
return new JobName(value);
|
||||
}
|
||||
// user-name
|
||||
if (MRJobConfig.USER_NAME.equals(latestKey)) {
|
||||
return new UserName(value);
|
||||
}
|
||||
// queue-name
|
||||
if (MRJobConfig.QUEUE_NAME.equals(latestKey)) {
|
||||
return new QueueName(value);
|
||||
}
|
||||
if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey)
|
||||
|| MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) {
|
||||
List<String> heapOptions = new ArrayList<String>();
|
||||
extractMaxHeapOpts(value, heapOptions, new ArrayList<String>());
|
||||
extractMinHeapOpts(value, heapOptions, new ArrayList<String>());
|
||||
return new DefaultDataType(StringUtils.join(heapOptions, ' '));
|
||||
}
|
||||
|
||||
//TODO compression?
|
||||
//TODO Other job configs like FileOutputFormat/FileInputFormat etc
|
||||
|
||||
// check if the config parameter represents a number
|
||||
try {
|
||||
format.parse(value);
|
||||
return new DefaultDataType(value);
|
||||
} catch (ParseException pe) {}
|
||||
|
||||
// check if the config parameters represents a boolean
|
||||
// avoiding exceptions
|
||||
if ("true".equals(value) || "false".equals(value)) {
|
||||
Boolean.parseBoolean(value);
|
||||
return new DefaultDataType(value);
|
||||
}
|
||||
|
||||
// check if the config parameter represents a class
|
||||
if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) {
|
||||
return new ClassName(value);
|
||||
}
|
||||
|
||||
// handle distributed cache sizes and timestamps
|
||||
if (latestKey.endsWith("sizes")
|
||||
|| latestKey.endsWith(".timestamps")) {
|
||||
new DefaultDataType(value);
|
||||
}
|
||||
|
||||
// check if the config parameter represents a file-system path
|
||||
//TODO: Make this concrete .location .path .dir .jar?
|
||||
if (latestKey.endsWith(".dir") || latestKey.endsWith(".location")
|
||||
|| latestKey.endsWith(".jar") || latestKey.endsWith(".path")
|
||||
|| latestKey.endsWith(".logfile") || latestKey.endsWith(".file")
|
||||
|| latestKey.endsWith(".files") || latestKey.endsWith(".archives")) {
|
||||
try {
|
||||
return new FileName(value);
|
||||
} catch (Exception ioe) {}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.serializers;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.JsonSerializer;
|
||||
import org.codehaus.jackson.map.SerializerProvider;
|
||||
|
||||
/**
|
||||
* A JSON serializer for Strings.
|
||||
*/
|
||||
public class BlockingSerializer extends JsonSerializer<String> {
|
||||
|
||||
public void serialize(String object, JsonGenerator jGen, SerializerProvider sProvider)
|
||||
throws IOException, JsonProcessingException {
|
||||
jGen.writeNull();
|
||||
};
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.serializers;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.AnonymizableDataType;
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.JsonSerializer;
|
||||
import org.codehaus.jackson.map.SerializerProvider;
|
||||
|
||||
/**
|
||||
* Default Rumen JSON serializer.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class DefaultAnonymizingRumenSerializer
|
||||
extends JsonSerializer<AnonymizableDataType> {
|
||||
private StatePool statePool;
|
||||
private Configuration conf;
|
||||
|
||||
public DefaultAnonymizingRumenSerializer(StatePool statePool,
|
||||
Configuration conf) {
|
||||
this.statePool = statePool;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public void serialize(AnonymizableDataType object, JsonGenerator jGen,
|
||||
SerializerProvider sProvider)
|
||||
throws IOException, JsonProcessingException {
|
||||
Object val = object.getAnonymizedValue(statePool, conf);
|
||||
// output the data if its a string
|
||||
if (val instanceof String) {
|
||||
jGen.writeString(val.toString());
|
||||
} else {
|
||||
// let the mapper (JSON generator) handle this anonymized object.
|
||||
jGen.writeObject(val);
|
||||
}
|
||||
};
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.serializers;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DataType;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.JsonSerializer;
|
||||
import org.codehaus.jackson.map.SerializerProvider;
|
||||
|
||||
/**
|
||||
* Default Rumen JSON serializer.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class DefaultRumenSerializer extends JsonSerializer<DataType> {
|
||||
public void serialize(DataType object, JsonGenerator jGen, SerializerProvider sProvider)
|
||||
throws IOException, JsonProcessingException {
|
||||
Object data = object.getValue();
|
||||
if (data instanceof String) {
|
||||
jGen.writeString(data.toString());
|
||||
} else {
|
||||
jGen.writeObject(data);
|
||||
}
|
||||
};
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.serializers;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.JsonSerializer;
|
||||
import org.codehaus.jackson.map.SerializerProvider;
|
||||
|
||||
/**
|
||||
* Rumen JSON serializer for serializing object using toSring() API.
|
||||
*/
|
||||
public class ObjectStringSerializer<T> extends JsonSerializer<T> {
|
||||
public void serialize(T object, JsonGenerator jGen, SerializerProvider sProvider)
|
||||
throws IOException, JsonProcessingException {
|
||||
jGen.writeString(object.toString());
|
||||
};
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.state;
|
||||
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Represents a state. This state is managed by {@link StatePool}.
|
||||
*
|
||||
* Note that a {@link State} objects should be persistable. Currently, the
|
||||
* {@link State} objects are persisted using the Jackson JSON library. Hence the
|
||||
* implementors of the {@link State} interface should be careful while defining
|
||||
* their public setter and getter APIs.
|
||||
*/
|
||||
public interface State {
|
||||
/**
|
||||
* Returns true if the state is updated since creation (or reload).
|
||||
*/
|
||||
@JsonIgnore
|
||||
boolean isUpdated();
|
||||
|
||||
/**
|
||||
* Get the name of the state.
|
||||
*/
|
||||
public String getName();
|
||||
|
||||
/**
|
||||
* Set the name of the state.
|
||||
*/
|
||||
public void setName(String name);
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.state;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.tools.rumen.state.StatePool.StatePair;
|
||||
import org.codehaus.jackson.JsonParser;
|
||||
import org.codehaus.jackson.JsonProcessingException;
|
||||
import org.codehaus.jackson.map.DeserializationContext;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.deser.StdDeserializer;
|
||||
import org.codehaus.jackson.node.ObjectNode;
|
||||
|
||||
/**
|
||||
* Rumen JSON deserializer for deserializing the {@link State} object.
|
||||
*/
|
||||
public class StateDeserializer extends StdDeserializer<StatePair> {
|
||||
public StateDeserializer() {
|
||||
super(StatePair.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatePair deserialize(JsonParser parser,
|
||||
DeserializationContext context)
|
||||
throws IOException, JsonProcessingException {
|
||||
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
|
||||
// set the state-pair object tree
|
||||
ObjectNode statePairObject = (ObjectNode) mapper.readTree(parser);
|
||||
Class<?> stateClass = null;
|
||||
|
||||
try {
|
||||
stateClass =
|
||||
Class.forName(statePairObject.get("className").getTextValue().trim());
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new RuntimeException("Invalid classname!", cnfe);
|
||||
}
|
||||
|
||||
String stateJsonString = statePairObject.get("state").toString();
|
||||
State state = (State) mapper.readValue(stateJsonString, stateClass);
|
||||
|
||||
return new StatePair(state);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,345 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen.state;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.tools.rumen.Anonymizer;
|
||||
import org.apache.hadoop.tools.rumen.datatypes.DataType;
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.JsonParser;
|
||||
import org.codehaus.jackson.Version;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
import org.codehaus.jackson.map.DeserializationConfig;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig;
|
||||
import org.codehaus.jackson.map.module.SimpleModule;
|
||||
|
||||
/**
|
||||
* A pool of states. States used by {@link DataType}'s can be managed the
|
||||
* {@link StatePool}. {@link StatePool} also supports persistence. Persistence
|
||||
* is key to share states across multiple {@link Anonymizer} runs.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class StatePool {
|
||||
private static final long VERSION = 1L;
|
||||
private boolean isUpdated = false;
|
||||
private boolean isInitialized = false;
|
||||
private Configuration conf;
|
||||
|
||||
// persistence configuration
|
||||
public static final String DIR_CONFIG = "rumen.anonymization.states.dir";
|
||||
public static final String RELOAD_CONFIG =
|
||||
"rumen.anonymization.states.reload";
|
||||
public static final String PERSIST_CONFIG =
|
||||
"rumen.anonymization.states.persist";
|
||||
|
||||
// internal state management configs
|
||||
private static final String COMMIT_STATE_FILENAME = "latest";
|
||||
private static final String CURRENT_STATE_FILENAME = "temp";
|
||||
|
||||
private String timeStamp;
|
||||
private Path persistDirPath;
|
||||
private boolean reload;
|
||||
private boolean persist;
|
||||
|
||||
/**
|
||||
* A wrapper class that binds the state implementation to its implementing
|
||||
* class name.
|
||||
*/
|
||||
public static class StatePair {
|
||||
private String className;
|
||||
private State state;
|
||||
|
||||
public StatePair(State state) {
|
||||
this.className = state.getClass().getName();
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public String getClassName() {
|
||||
return className;
|
||||
}
|
||||
|
||||
public void setClassName(String className) {
|
||||
this.className = className;
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public void setState(State state) {
|
||||
this.state = state;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Identifies to identify and cache {@link State}s.
|
||||
*/
|
||||
private HashMap<String, StatePair> pool = new HashMap<String, StatePair>();
|
||||
|
||||
public void addState(Class id, State state) {
|
||||
if (pool.containsKey(id.getName())) {
|
||||
throw new RuntimeException("State '" + state.getName() + "' added for the"
|
||||
+ " class " + id.getName() + " already exists!");
|
||||
}
|
||||
isUpdated = true;
|
||||
pool.put(id.getName(), new StatePair(state));
|
||||
}
|
||||
|
||||
public State getState(Class clazz) {
|
||||
return pool.containsKey(clazz.getName())
|
||||
? pool.get(clazz.getName()).getState()
|
||||
: null;
|
||||
}
|
||||
|
||||
// For testing
|
||||
@JsonIgnore
|
||||
public boolean isUpdated() {
|
||||
if (!isUpdated) {
|
||||
for (StatePair statePair : pool.values()) {
|
||||
// if one of the states have changed, then the pool is dirty
|
||||
if (statePair.getState().isUpdated()) {
|
||||
isUpdated = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return isUpdated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialized the {@link StatePool}. This API also reloads the previously
|
||||
* persisted state. Note that the {@link StatePool} should be initialized only
|
||||
* once.
|
||||
*/
|
||||
public void initialize(Configuration conf) throws Exception {
|
||||
if (isInitialized) {
|
||||
throw new RuntimeException("StatePool is already initialized!");
|
||||
}
|
||||
|
||||
this.conf = conf;
|
||||
String persistDir = conf.get(DIR_CONFIG);
|
||||
reload = conf.getBoolean(RELOAD_CONFIG, false);
|
||||
persist = conf.getBoolean(PERSIST_CONFIG, false);
|
||||
|
||||
// reload if configured
|
||||
if (reload || persist) {
|
||||
System.out.println("State Manager initializing. State directory : "
|
||||
+ persistDir);
|
||||
System.out.println("Reload:" + reload + " Persist:" + persist);
|
||||
if (persistDir == null) {
|
||||
throw new RuntimeException("No state persist directory configured!"
|
||||
+ " Disable persistence.");
|
||||
} else {
|
||||
this.persistDirPath = new Path(persistDir);
|
||||
}
|
||||
} else {
|
||||
System.out.println("State Manager disabled.");
|
||||
}
|
||||
|
||||
// reload
|
||||
reload();
|
||||
|
||||
// now set the timestamp
|
||||
DateFormat formatter =
|
||||
new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'");
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTimeInMillis(System.currentTimeMillis());
|
||||
timeStamp = formatter.format(calendar.getTime());
|
||||
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
private void reload() throws Exception {
|
||||
if (reload) {
|
||||
// Reload persisted entries
|
||||
Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME);
|
||||
FileSystem fs = stateFilename.getFileSystem(conf);
|
||||
if (fs.exists(stateFilename)) {
|
||||
reloadState(stateFilename, conf);
|
||||
} else {
|
||||
throw new RuntimeException("No latest state persist directory found!"
|
||||
+ " Disable persistence and run.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void reloadState(Path stateFile, Configuration conf)
|
||||
throws Exception {
|
||||
FileSystem fs = stateFile.getFileSystem(conf);
|
||||
if (fs.exists(stateFile)) {
|
||||
System.out.println("Reading state from " + stateFile.toString());
|
||||
FSDataInputStream in = fs.open(stateFile);
|
||||
|
||||
read(in);
|
||||
in.close();
|
||||
} else {
|
||||
System.out.println("No state information found for " + stateFile);
|
||||
}
|
||||
}
|
||||
|
||||
private void read(DataInput in) throws IOException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(
|
||||
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
|
||||
// define a module
|
||||
SimpleModule module = new SimpleModule("State Serializer",
|
||||
new Version(0, 1, 1, "FINAL"));
|
||||
// add the state deserializer
|
||||
module.addDeserializer(StatePair.class, new StateDeserializer());
|
||||
|
||||
// register the module with the object-mapper
|
||||
mapper.registerModule(module);
|
||||
|
||||
JsonParser parser =
|
||||
mapper.getJsonFactory().createJsonParser((DataInputStream)in);
|
||||
StatePool statePool = mapper.readValue(parser, StatePool.class);
|
||||
this.setStates(statePool.getStates());
|
||||
parser.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the current state to the state directory. The state will be
|
||||
* persisted to the 'latest' file in the state directory.
|
||||
*/
|
||||
public void persist() throws IOException {
|
||||
if (!persist) {
|
||||
return;
|
||||
}
|
||||
if (isUpdated()) {
|
||||
System.out.println("State is updated! Committing.");
|
||||
Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME);
|
||||
Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME);
|
||||
FileSystem fs = currStateFile.getFileSystem(conf);
|
||||
|
||||
System.out.println("Starting the persist phase. Persisting to "
|
||||
+ currStateFile.toString());
|
||||
// persist current state
|
||||
// write the contents of the current state to the current(temp) directory
|
||||
FSDataOutputStream out = fs.create(currStateFile, true);
|
||||
write(out);
|
||||
out.close();
|
||||
|
||||
System.out.println("Persist phase over. The best known un-committed state"
|
||||
+ " is located at " + currStateFile.toString());
|
||||
|
||||
// commit (phase-1)
|
||||
// copy the previous commit file to the relocation file
|
||||
if (fs.exists(commitStateFile)) {
|
||||
Path commitRelocationFile = new Path(persistDirPath, timeStamp);
|
||||
System.out.println("Starting the pre-commit phase. Moving the previous "
|
||||
+ "best known state to " + commitRelocationFile.toString());
|
||||
// copy the commit file to the relocation file
|
||||
FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false,
|
||||
conf);
|
||||
}
|
||||
|
||||
// commit (phase-2)
|
||||
System.out.println("Starting the commit phase. Committing the states in "
|
||||
+ currStateFile.toString());
|
||||
FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf);
|
||||
|
||||
System.out.println("Commit phase successful! The best known committed "
|
||||
+ "state is located at " + commitStateFile.toString());
|
||||
} else {
|
||||
System.out.println("State not updated! No commit required.");
|
||||
}
|
||||
}
|
||||
|
||||
private void write(DataOutput out) throws IOException {
|
||||
// This is just a JSON experiment
|
||||
System.out.println("Dumping the StatePool's in JSON format.");
|
||||
ObjectMapper outMapper = new ObjectMapper();
|
||||
outMapper.configure(
|
||||
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
|
||||
// define a module
|
||||
SimpleModule module = new SimpleModule("State Serializer",
|
||||
new Version(0, 1, 1, "FINAL"));
|
||||
// add the state serializer
|
||||
//module.addSerializer(State.class, new StateSerializer());
|
||||
|
||||
// register the module with the object-mapper
|
||||
outMapper.registerModule(module);
|
||||
|
||||
JsonFactory outFactory = outMapper.getJsonFactory();
|
||||
JsonGenerator jGen =
|
||||
outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8);
|
||||
jGen.useDefaultPrettyPrinter();
|
||||
|
||||
jGen.writeObject(this);
|
||||
jGen.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Getters and setters for JSON serialization
|
||||
*/
|
||||
|
||||
/**
|
||||
* To be invoked only by the Jackson JSON serializer.
|
||||
*/
|
||||
public long getVersion() {
|
||||
return VERSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* To be invoked only by the Jackson JSON deserializer.
|
||||
*/
|
||||
public void setVersion(long version) {
|
||||
if (version != VERSION) {
|
||||
throw new RuntimeException("Version mismatch! Expected " + VERSION
|
||||
+ " got " + version);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To be invoked only by the Jackson JSON serializer.
|
||||
*/
|
||||
public HashMap<String, StatePair> getStates() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* To be invoked only by the Jackson JSON deserializer.
|
||||
*/
|
||||
public void setStates(HashMap<String, StatePair> states) {
|
||||
if (pool.size() > 0) {
|
||||
throw new RuntimeException("Pool not empty!");
|
||||
}
|
||||
|
||||
//TODO Should we do a clone?
|
||||
this.pool = states;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue