From 32d215dc89726c87d35a39ab881760cd9b92e4df Mon Sep 17 00:00:00 2001 From: Plamen Jeliazkov Date: Thu, 19 Feb 2015 00:14:11 -0800 Subject: [PATCH] MAPREDUCE-6228. Add truncate operation to SLive. Contributed by Plamen Jeliazkov. --- hadoop-mapreduce-project/CHANGES.txt | 6 +- .../hadoop/fs/slive/ArgumentParser.java | 2 + .../hadoop/fs/slive/ConfigExtractor.java | 59 +++++++++ .../apache/hadoop/fs/slive/ConfigMerger.java | 35 ++++++ .../apache/hadoop/fs/slive/ConfigOption.java | 9 ++ .../org/apache/hadoop/fs/slive/Constants.java | 4 +- .../hadoop/fs/slive/OperationFactory.java | 3 + .../org/apache/hadoop/fs/slive/TestSlive.java | 27 +++++ .../apache/hadoop/fs/slive/TruncateOp.java | 114 ++++++++++++++++++ 9 files changed, 255 insertions(+), 4 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TruncateOp.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 79c14ca7dcd..da119f62a85 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -6,6 +6,10 @@ Release 2.7.0 - UNRELEASED NEW FEATURES + MAPREDUCE-6227. DFSIO for truncate. (shv via yliu) + + MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv) + IMPROVEMENTS MAPREDUCE-6149. Document override log4j.properties in MR job. @@ -42,8 +46,6 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors (aajisaka) - MAPREDUCE-6227. DFSIO for truncate. (shv via yliu) - MAPREDUCE-6253. Update use of Iterator to Iterable. (Ray Chiang via devaraj) MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ArgumentParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ArgumentParser.java index 19a55ff9bda..12df4dc3585 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ArgumentParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ArgumentParser.java @@ -144,6 +144,7 @@ class ArgumentParser { cliopt.addOption(ConfigOption.DURATION); cliopt.addOption(ConfigOption.EXIT_ON_ERROR); cliopt.addOption(ConfigOption.SLEEP_TIME); + cliopt.addOption(ConfigOption.TRUNCATE_WAIT); cliopt.addOption(ConfigOption.FILES); cliopt.addOption(ConfigOption.DIR_SIZE); cliopt.addOption(ConfigOption.BASE_DIR); @@ -167,6 +168,7 @@ class ArgumentParser { cliopt.addOption(ConfigOption.READ_SIZE); cliopt.addOption(ConfigOption.WRITE_SIZE); cliopt.addOption(ConfigOption.APPEND_SIZE); + cliopt.addOption(ConfigOption.TRUNCATE_SIZE); cliopt.addOption(ConfigOption.RANDOM_SEED); cliopt.addOption(ConfigOption.QUEUE_NAME); cliopt.addOption(ConfigOption.HELP); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigExtractor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigExtractor.java index a03c8126728..ef4e436a7df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigExtractor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigExtractor.java @@ -130,6 +130,32 @@ class ConfigExtractor { return Boolean.parseBoolean(val); } + /** + * @return whether the mapper or reducer should wait for truncate recovery + */ + boolean shouldWaitOnTruncate() { + return shouldWaitOnTruncate(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * + * @return whether the mapper or reducer should wait for truncate recovery + */ + boolean shouldWaitOnTruncate(String primary) { + String val = primary; + if (val == null) { + val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption()); + } + if (val == null) { + return ConfigOption.EXIT_ON_ERROR.getDefault(); + } + return Boolean.parseBoolean(val); + } + /** * @return the number of reducers to use */ @@ -533,6 +559,24 @@ class ConfigExtractor { return getAppendSize(null); } + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the truncate byte size range (or null if none) + */ + Range getTruncateSize(String primary) { + return getMinMaxBytes(ConfigOption.TRUNCATE_SIZE, primary); + } + + /** + * @return the truncate byte size range (or null if none) using config and + * default for lookup + */ + Range getTruncateSize() { + return getTruncateSize(null); + } + /** * @param primary * the initial string to be used for the value of this configuration @@ -599,6 +643,21 @@ class ConfigExtractor { return false; } + /** + * Returns whether the truncate range should use the block size range + * + * @return true|false + */ + boolean shouldTruncateUseBlockSize() { + Range truncateRange = getTruncateSize(); + if (truncateRange == null + || (truncateRange.getLower() == truncateRange.getUpper() + && (truncateRange.getUpper() == Long.MAX_VALUE))) { + return true; + } + return false; + } + /** * Returns whether the read range should use the entire file * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigMerger.java index b7be8d85e20..4bb3500e6f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigMerger.java @@ -282,6 +282,18 @@ class ConfigMerger { "Error extracting & merging exit on error value", e); } } + // overwrite the truncate wait setting + { + try { + boolean waitOnTruncate = extractor.shouldWaitOnTruncate(opts + .getValue(ConfigOption.TRUNCATE_WAIT.getOpt())); + base.setBoolean(ConfigOption.TRUNCATE_WAIT.getCfgOption(), + waitOnTruncate); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging wait on truncate value", e); + } + } // verify and set file limit and ensure > 0 { Integer fileAm = null; @@ -553,6 +565,29 @@ class ConfigMerger { .set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString()); } } + // set the truncate size range + { + Range truncateSize = null; + try { + truncateSize = extractor.getTruncateSize(opts + .getValue(ConfigOption.TRUNCATE_SIZE.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging truncate size range", e); + } + if (truncateSize != null) { + if (truncateSize.getLower() > truncateSize.getUpper()) { + throw new ConfigException( + "Truncate size minimum is greater than its maximum"); + } + if (truncateSize.getLower() < 0) { + throw new ConfigException( + "Truncate size minimum must be greater than or equal to zero"); + } + base + .set(ConfigOption.TRUNCATE_SIZE.getCfgOption(), truncateSize.toString()); + } + } // set the seed { Long seed = null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigOption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigOption.java index 340473a7774..bd663364cde 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigOption.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/ConfigOption.java @@ -97,6 +97,15 @@ class ConfigOption extends Option { "Min,max for size to append (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX + ".op.append.size", null); + static final ConfigOption TRUNCATE_WAIT = new ConfigOption( + "truncateWait", true, "Should wait for truncate recovery", SLIVE_PREFIX + + ".op.truncate.wait", true); + + static final ConfigOption TRUNCATE_SIZE = new ConfigOption( + "truncateSize", true, + "Min,max for size to truncate (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX + + ".op.truncate.size", null); + static final ConfigOption RANDOM_SEED = new ConfigOption( "seed", true, "Random number seed", SLIVE_PREFIX + ".seed", null); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Constants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Constants.java index 6f4f44213b8..0642052e0f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Constants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Constants.java @@ -43,7 +43,7 @@ class Constants { * Allowed operation types */ enum OperationType { - READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE; + READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE, TRUNCATE; String lowerName() { return this.name().toLowerCase(); } @@ -51,7 +51,7 @@ class Constants { // program info static final String PROG_NAME = SliveTest.class.getSimpleName(); - static final String PROG_VERSION = "0.0.2"; + static final String PROG_VERSION = "0.1.0"; // useful constants static final int MEGABYTES = 1048576; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/OperationFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/OperationFactory.java index 52a4c9f87b5..6af825f7357 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/OperationFactory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/OperationFactory.java @@ -75,6 +75,9 @@ class OperationFactory { case CREATE: op = new CreateOp(this.config, rnd); break; + case TRUNCATE: + op = new TruncateOp(this.config, rnd); + break; } typedOperations.put(type, op); return op; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java index 3db7695fc39..25e3340e20d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java @@ -136,6 +136,8 @@ public class TestSlive { args.add("10"); args.add("-" + ConfigOption.FILES.getOpt()); args.add("10"); + args.add("-" + ConfigOption.TRUNCATE_SIZE.getOpt()); + args.add("0,1M"); } return args.toArray(new String[args.size()]); } @@ -237,6 +239,9 @@ public class TestSlive { Range wRange = extractor.getWriteSize(); assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2); + Range trRange = extractor.getTruncateSize(); + assertEquals(trRange.getLower().intValue(), 0); + assertEquals(trRange.getUpper().intValue(), Constants.MEGABYTES * 1); Range bRange = extractor.getBlockSize(); assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2); @@ -534,4 +539,26 @@ public class TestSlive { }; runOperationOk(extractor, aop, false); } + + @Test + public void testTruncateOp() throws Exception { + // setup a valid config + ConfigExtractor extractor = getTestConfig(false); + // ensure file created before append + final Path fn = new Path(getTestFile().getCanonicalPath()); + CreateOp op = new CreateOp(extractor, rnd) { + protected Path getCreateFile() { + return fn; + } + }; + runOperationOk(extractor, op, true); + // local file system (ChecksumFileSystem) currently doesn't support truncate - + // but we'll leave this test here anyways but can't check the results.. + TruncateOp top = new TruncateOp(extractor, rnd) { + protected Path getTruncateFile() { + return fn; + } + }; + runOperationOk(extractor, top, false); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TruncateOp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TruncateOp.java new file mode 100644 index 00000000000..c845ac11618 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TruncateOp.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.slive.OperationOutput.OutputType; + +/** + * Operation which selects a random file and truncates a random amount of bytes + * (selected from the configuration for truncate size) from that file, + * if it exists. + * + * This operation will capture statistics on success for bytes written, time + * taken (milliseconds), and success count and on failure it will capture the + * number of failures and the time taken (milliseconds) to fail. + */ +class TruncateOp extends Operation { + + private static final Log LOG = LogFactory.getLog(TruncateOp.class); + + TruncateOp(ConfigExtractor cfg, Random rnd) { + super(TruncateOp.class.getSimpleName(), cfg, rnd); + } + + /** + * Gets the file to truncate from + * + * @return Path + */ + protected Path getTruncateFile() { + Path fn = getFinder().getFile(); + return fn; + } + + @Override // Operation + List run(FileSystem fs) { + List out = super.run(fs); + try { + Path fn = getTruncateFile(); + boolean waitOnTruncate = getConfig().shouldWaitOnTruncate(); + long currentSize = fs.getFileStatus(fn).getLen(); + // determine file status for file length requirement + // to know if should fill in partial bytes + Range truncateSizeRange = getConfig().getTruncateSize(); + if (getConfig().shouldTruncateUseBlockSize()) { + truncateSizeRange = getConfig().getBlockSize(); + } + long truncateSize = Math.max(0L, + currentSize - Range.betweenPositive(getRandom(), truncateSizeRange)); + long timeTaken = 0; + LOG.info("Attempting to truncate file at " + fn + " to size " + + Helper.toByteInfo(truncateSize)); + { + // truncate + long startTime = Timer.now(); + boolean completed = fs.truncate(fn, truncateSize); + if(!completed && waitOnTruncate) + waitForRecovery(fs, fn, truncateSize); + timeTaken += Timer.elapsed(startTime); + } + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.BYTES_WRITTEN, 0)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.OK_TIME_TAKEN, timeTaken)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.SUCCESSES, 1L)); + LOG.info("Truncate file " + fn + " to " + Helper.toByteInfo(truncateSize) + + " in " + timeTaken + " milliseconds"); + } catch (FileNotFoundException e) { + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.NOT_FOUND, 1L)); + LOG.warn("Error with truncating", e); + } catch (IOException e) { + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.FAILURES, 1L)); + LOG.warn("Error with truncating", e); + } + return out; + } + + private void waitForRecovery(FileSystem fs, Path fn, long newLength) + throws IOException { + LOG.info("Waiting on truncate file recovery for " + fn); + for(;;) { + FileStatus stat = fs.getFileStatus(fn); + if(stat.getLen() == newLength) break; + try {Thread.sleep(1000);} catch(InterruptedException ignored) {} + } + } +}