From 477505ccfc480f2605a7b65de95ea6f6ff5ce090 Mon Sep 17 00:00:00 2001 From: Takanobu Asanuma Date: Sat, 2 Nov 2019 01:32:32 +0900 Subject: [PATCH] HDFS-14824. [Dynamometer] Dynamometer in org.apache.hadoop.tools does not output the benchmark results. (#1685) --- .../hadoop/tools/dynamometer/Client.java | 7 ++ .../dynamometer/TestDynamometerInfra.java | 7 +- .../workloadgenerator/CreateFileMapper.java | 3 +- .../workloadgenerator/WorkloadDriver.java | 27 ++--- .../workloadgenerator/WorkloadMapper.java | 31 +++-- .../audit/AuditReplayMapper.java | 39 ++++-- .../audit/AuditReplayReducer.java | 44 +++++++ .../audit/AuditReplayThread.java | 18 +++ .../audit/CountTimeWritable.java | 82 +++++++++++++ .../audit/UserCommandKey.java | 111 ++++++++++++++++++ .../TestWorkloadGenerator.java | 42 +++++-- .../src/site/markdown/Dynamometer.md | 3 + 12 files changed, 366 insertions(+), 48 deletions(-) create mode 100644 hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java create mode 100644 hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java create mode 100644 hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java index 42c1410fbb3..36f90b54d3f 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java @@ -166,6 +166,7 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable"; public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path"; + public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path"; public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; @@ -231,6 +232,8 @@ public class Client extends Configured implements Tool { private volatile Job workloadJob; // The input path for the workload job. private String workloadInputPath = ""; + // The output path for the workload job metric results. + private String workloadOutputPath = ""; // The number of threads to use per mapper for the workload job. private int workloadThreadsPerMapper; // The startup delay for the workload job. @@ -347,6 +350,8 @@ public class Client extends Configured implements Tool { + "audit logs against the HDFS cluster which is started."); opts.addOption(WORKLOAD_INPUT_PATH_ARG, true, "Location of the audit traces to replay (Required for workload)"); + opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true, + "Location of the metrics output (Required for workload)"); opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads " + "per mapper to use to replay the workload. (default " + AuditReplayMapper.NUM_THREADS_DEFAULT + ")"); @@ -476,6 +481,7 @@ public class Client extends Configured implements Tool { } launchWorkloadJob = true; workloadInputPath = commandLine.getOptionValue(WORKLOAD_INPUT_PATH_ARG); + workloadOutputPath = commandLine.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG); workloadThreadsPerMapper = Integer .parseInt(commandLine.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG, String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT))); @@ -1032,6 +1038,7 @@ public class Client extends Configured implements Tool { + workloadStartDelayMs; Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath); workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java index b008095eff8..056b7de70b8 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java @@ -122,7 +122,7 @@ public class TestDynamometerInfra { private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path"; private static final String HADOOP_BIN_VERSION_KEY = "dyno.hadoop.bin.version"; - private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.2"; + private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.3"; private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740"; private static final String VERSION_FILENAME = "VERSION"; @@ -132,6 +132,8 @@ public class TestDynamometerInfra { private static final String NAMENODE_NODELABEL = "dyno_namenode"; private static final String DATANODE_NODELABEL = "dyno_datanode"; + private static final String OUTPUT_PATH = "/tmp/trace_output_direct"; + private static MiniDFSCluster miniDFSCluster; private static MiniYARNCluster miniYARNCluster; private static YarnClient yarnClient; @@ -408,6 +410,7 @@ public class TestDynamometerInfra { return false; } }, 3000, 60000); + assertTrue(fs.exists(new Path(OUTPUT_PATH))); } private void assertClusterIsFunctional(Configuration localConf, @@ -477,6 +480,8 @@ public class TestDynamometerInfra { "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, "-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(), + "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, + fs.makeQualified(new Path(OUTPUT_PATH)).toString(), "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1", "-" + Client.WORKLOAD_START_DELAY_ARG, "10s", "-" + AMOptions.NAMENODE_ARGS_ARG, diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java index 24aec936d92..33dc81d5a24 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java @@ -48,7 +48,8 @@ import org.apache.hadoop.mapreduce.Mapper; * */ public class CreateFileMapper - extends WorkloadMapper { + extends WorkloadMapper { public static final String NUM_MAPPERS_KEY = "createfile.num-mappers"; public static final String DURATION_MIN_KEY = "createfile.duration-min"; diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java index d34cae72cea..8b170c12acc 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java @@ -32,9 +32,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -124,7 +122,7 @@ public class WorkloadDriver extends Configured implements Tool { startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0, TimeUnit.MILLISECONDS) + System.currentTimeMillis(); } - Class> mapperClass = getMapperClass( + Class> mapperClass = getMapperClass( cli.getOptionValue(MAPPER_CLASS_NAME)); if (!mapperClass.newInstance().verifyConfigurations(getConf())) { System.err @@ -140,8 +138,9 @@ public class WorkloadDriver extends Configured implements Tool { } public static Job getJobForSubmission(Configuration baseConf, String nnURI, - long startTimestampMs, Class> mapperClass) - throws IOException, InstantiationException, IllegalAccessException { + long startTimestampMs, Class> + mapperClass) throws IOException, InstantiationException, + IllegalAccessException { Configuration conf = new Configuration(baseConf); conf.set(NN_URI, nnURI); conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); @@ -153,16 +152,9 @@ public class WorkloadDriver extends Configured implements Tool { conf.setLong(START_TIMESTAMP_MS, startTimestampMs); Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); - job.setOutputFormatClass(NullOutputFormat.class); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); - job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf)); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); + mapperClass.newInstance().configureJob(job); return job; } @@ -175,8 +167,8 @@ public class WorkloadDriver extends Configured implements Tool { // The cast is actually checked via isAssignableFrom but the compiler doesn't // recognize this @SuppressWarnings("unchecked") - private Class> getMapperClass(String className) - throws ClassNotFoundException { + private Class> getMapperClass( + String className) throws ClassNotFoundException { if (!className.contains(".")) { className = WorkloadDriver.class.getPackage().getName() + "." + className; } @@ -185,13 +177,14 @@ public class WorkloadDriver extends Configured implements Tool { throw new IllegalArgumentException(className + " is not a subclass of " + WorkloadMapper.class.getCanonicalName()); } - return (Class>) mapperClass; + return (Class>) mapperClass; } private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { - WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); + WorkloadMapper mapper = getMapperClass(mapperClassName) + .newInstance(); StringBuilder builder = new StringBuilder("Usage for "); builder.append(mapper.getClass().getSimpleName()); builder.append(":\n"); diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java index afe5d45e948..d73f5962d74 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java @@ -21,25 +21,18 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** * Represents the base class for a generic workload-generating mapper. By * default, it will expect to use {@link VirtualInputFormat} as its - * {@link InputFormat}. Subclasses expecting a different {@link InputFormat} - * should override the {@link #getInputFormat(Configuration)} method. + * {@link InputFormat}. Subclasses requiring a reducer or expecting a different + * {@link InputFormat} should override the {@link #configureJob(Job)} method. */ -public abstract class WorkloadMapper - extends Mapper { - - /** - * Return the input class to be used by this mapper. - * @param conf configuration. - * @return the {@link InputFormat} implementation for the mapper. - */ - public Class getInputFormat(Configuration conf) { - return VirtualInputFormat.class; - } +public abstract class WorkloadMapper extends + Mapper { /** * Get the description of the behavior of this mapper. @@ -62,4 +55,16 @@ public abstract class WorkloadMapper */ public abstract boolean verifyConfigurations(Configuration conf); + /** + * Setup input and output formats and optional reducer. + */ + public void configureJob(Job job) { + job.setInputFormatClass(VirtualInputFormat.class); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + } + } diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 27beda16a62..4dad215409c 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -20,6 +20,10 @@ package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit; import com.google.common.collect.Lists; import java.util.Optional; import java.util.function.Function; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadDriver; import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper; import java.io.IOException; @@ -35,7 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -73,9 +76,11 @@ import static org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditR * are replayed. For example, a rate factor of 2 would make the replay occur * twice as fast, and a rate factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; + public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; public static final String NUM_THREADS_KEY = "auditreplay.num-threads"; public static final int NUM_THREADS_DEFAULT = 1; public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks"; @@ -170,11 +175,6 @@ public class AuditReplayMapper extends WorkloadMapper { private AuditCommandParser commandParser; private ScheduledThreadPoolExecutor progressExecutor; - @Override - public Class getInputFormat(Configuration conf) { - return NoSplitTextInputFormat.class; - } - @Override public String getDescription() { return "This mapper replays audit log files."; @@ -185,6 +185,7 @@ public class AuditReplayMapper extends WorkloadMapper { return Lists.newArrayList( INPUT_PATH_KEY + " (required): Path to directory containing input files.", + OUTPUT_PATH_KEY + " (required): Path to destination for output files.", NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.", CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT @@ -199,7 +200,8 @@ public class AuditReplayMapper extends WorkloadMapper { @Override public boolean verifyConfigurations(Configuration conf) { - return conf.get(INPUT_PATH_KEY) != null; + return conf.get(INPUT_PATH_KEY) != null + && conf.get(OUTPUT_PATH_KEY) != null; } @Override @@ -256,7 +258,8 @@ public class AuditReplayMapper extends WorkloadMapper { } @Override - public void cleanup(Mapper.Context context) throws InterruptedException { + public void cleanup(Mapper.Context context) + throws InterruptedException, IOException { for (AuditReplayThread t : threads) { // Add in an indicator for each thread to shut down after the last real // command @@ -266,6 +269,7 @@ public class AuditReplayMapper extends WorkloadMapper { for (AuditReplayThread t : threads) { t.join(); t.drainCounters(context); + t.drainCommandLatencies(context); if (t.getException() != null) { threadException = Optional.of(t.getException()); } @@ -287,4 +291,21 @@ public class AuditReplayMapper extends WorkloadMapper { LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps); } } + + @Override + public void configureJob(Job job) { + job.setMapOutputKeyClass(UserCommandKey.class); + job.setMapOutputValueClass(CountTimeWritable.class); + job.setInputFormatClass(NoSplitTextInputFormat.class); + + job.setNumReduceTasks(1); + job.setReducerClass(AuditReplayReducer.class); + job.setOutputKeyClass(UserCommandKey.class); + job.setOutputValueClass(CountTimeWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + + TextOutputFormat.setOutputPath(job, new Path( + job.getConfiguration().get(OUTPUT_PATH_KEY))); + job.getConfiguration().set(TextOutputFormat.SEPARATOR, ","); + } } diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java new file mode 100644 index 00000000000..cde1630c8ac --- /dev/null +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit; + +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +/** + * AuditReplayReducer aggregates the returned latency values from + * {@link AuditReplayMapper} and sums them up by {@link UserCommandKey}, which + * combines the user's id that ran the command and the type of the command + * (READ/WRITE). + */ +public class AuditReplayReducer extends Reducer { + + @Override + protected void reduce(UserCommandKey key, Iterable values, + Context context) throws IOException, InterruptedException { + long countSum = 0; + long timeSum = 0; + for (CountTimeWritable v : values) { + countSum += v.getCount(); + timeSum += v.getTime(); + } + context.write(key, new CountTimeWritable(countSum, timeSum)); + } +} diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java index e63c7a3b096..274c5a763bd 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -76,6 +76,8 @@ public class AuditReplayThread extends Thread { // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); + private Map commandLatencyMap + = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, @@ -123,6 +125,14 @@ public class AuditReplayThread extends Thread { } } + void drainCommandLatencies(Mapper.Context context) + throws InterruptedException, IOException { + for (Map.Entry ent + : commandLatencyMap.entrySet()) { + context.write(ent.getKey(), ent.getValue()); + } + } + /** * Add a command to this thread's processing queue. * @@ -279,6 +289,14 @@ public class AuditReplayThread extends Thread { throw new RuntimeException("Unexpected command: " + replayCommand); } long latency = System.currentTimeMillis() - startTime; + + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), + replayCommand.toString(), replayCommand.getType().toString()); + commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable()); + CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey); + latencyWritable.setCount(latencyWritable.getCount() + 1); + latencyWritable.setTime(latencyWritable.getTime() + latency); + switch (replayCommand.getType()) { case WRITE: replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY) diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java new file mode 100644 index 00000000000..6b851c84b64 --- /dev/null +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * UserCommandKey is a {@link Writable} used as a composite value that + * accumulates the count and cumulative latency of replayed commands. It is + * used as the output value for AuditReplayMapper and AuditReplayReducer. + */ +public class CountTimeWritable implements Writable { + private LongWritable count; + private LongWritable time; + + public CountTimeWritable() { + count = new LongWritable(); + time = new LongWritable(); + } + + public CountTimeWritable(LongWritable count, LongWritable time) { + this.count = count; + this.time = time; + } + + public CountTimeWritable(long count, long time) { + this.count = new LongWritable(count); + this.time = new LongWritable(time); + } + + public long getCount() { + return count.get(); + } + + public long getTime() { + return time.get(); + } + + public void setCount(long count) { + this.count.set(count); + } + + public void setTime(long time) { + this.time.set(time); + } + + @Override + public void write(DataOutput out) throws IOException { + count.write(out); + time.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + count.readFields(in); + time.readFields(in); + } + + @Override + public String toString() { + return getCount() + "," + getTime(); + } +} diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java new file mode 100644 index 00000000000..5cfe09f3bf2 --- /dev/null +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +import javax.annotation.Nonnull; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + +/** + * UserCommandKey is a {@link org.apache.hadoop.io.Writable} used as a composite + * key combining the user id, name, and type of a replayed command. It is used + * as the output key for AuditReplayMapper and the keys for AuditReplayReducer. + */ +public class UserCommandKey implements WritableComparable { + private Text user; + private Text command; + private Text type; + + public UserCommandKey() { + user = new Text(); + command = new Text(); + type = new Text(); + } + + public UserCommandKey(Text user, Text command, Text type) { + this.user = user; + this.command = command; + this.type = type; + } + + public UserCommandKey(String user, String command, String type) { + this.user = new Text(user); + this.command = new Text(command); + this.type = new Text(type); + } + + public String getUser() { + return user.toString(); + } + + public String getCommand() { + return command.toString(); + } + + public String getType() { + return type.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + user.write(out); + command.write(out); + type.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + user.readFields(in); + command.readFields(in); + type.readFields(in); + } + + @Override + public int compareTo(@Nonnull Object o) { + return toString().compareTo(o.toString()); + } + + @Override + public String toString() { + return getUser() + "," + getType() + "," + getCommand(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserCommandKey that = (UserCommandKey) o; + return getUser().equals(that.getUser()) && + getCommand().equals(that.getCommand()) && + getType().equals(that.getType()); + } + + @Override + public int hashCode() { + return Objects.hash(getUser(), getCommand(), getType()); + } +} diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 5b2a2e71675..0162352f08f 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.tools.dynamometer.workloadgenerator; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser; import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser; import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper; import java.io.IOException; +import java.nio.charset.StandardCharsets; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -34,9 +38,12 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ImpersonationProvider; +import org.jline.utils.Log; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS; import static org.junit.Assert.assertEquals; @@ -46,6 +53,8 @@ import static org.junit.Assert.assertTrue; /** Tests for {@link WorkloadDriver} and related classes. */ public class TestWorkloadGenerator { + private static final Logger LOG = + LoggerFactory.getLogger(TestWorkloadGenerator.class); private Configuration conf; private MiniDFSCluster miniCluster; @@ -73,22 +82,27 @@ public class TestWorkloadGenerator { } @Test - public void testAuditWorkloadDirectParser() throws Exception { + public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader() .getResource("audit_trace_direct").toString(); + String auditOutputPath = "/tmp/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60 * 1000); - testAuditWorkload(); + testAuditWorkloadWithOutput(auditOutputPath); } @Test - public void testAuditWorkloadHiveParser() throws Exception { - String workloadInputPath = TestWorkloadGenerator.class.getClassLoader() - .getResource("audit_trace_hive").toString(); + public void testAuditWorkloadHiveParserWithOutput() throws Exception { + String workloadInputPath = + TestWorkloadGenerator.class.getClassLoader() + .getResource("audit_trace_hive").toString(); + String auditOutputPath = "/tmp/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkload(); + testAuditWorkloadWithOutput(auditOutputPath); } /** @@ -114,7 +128,8 @@ public class TestWorkloadGenerator { } } - private void testAuditWorkload() throws Exception { + private void testAuditWorkloadWithOutput(String auditOutputPath) + throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class); @@ -132,5 +147,18 @@ public class TestWorkloadGenerator { assertTrue( dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory()); assertFalse(dfs.exists(new Path("/denied"))); + + assertTrue(dfs.exists(new Path(auditOutputPath))); + try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, + "part-r-00000"))) { + String auditOutput = IOUtils.toString(auditOutputFile, + StandardCharsets.UTF_8); + Log.info(auditOutput); + assertTrue(auditOutput.matches( + ".*(hdfs,WRITE,[A-Z]+,[13]+,[0-9]+\\n){3}.*")); + // Matches three lines of the format "hdfs,WRITE,name,count,time" + // Using [13] for the count group because each operation is run either + // 1 or 3 times but the output order isn't guaranteed + } } } diff --git a/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md b/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md index fee569a58d4..e6b3136a34a 100644 --- a/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md +++ b/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md @@ -232,6 +232,9 @@ within that file. A best effort is made to faithfully replay the audit log event originally occurred (optionally, this can be adjusted by specifying `auditreplay.rate-factor` which is a multiplicative factor towards the rate of replay, e.g. use 2.0 to replay the events at twice the original speed). +The AuditReplayMapper will output the benchmark results to a file `part-r-00000` in the output directory in CSV format. +Each line is in the format `user,type,operation,numops,cumulativelatency`, e.g. `hdfs,WRITE,MKDIRS,2,150`. + ### Integrated Workload Launch To have the infrastructure application client launch the workload automatically, parameters for the workload job