MAPREDUCE-6228. Add truncate operation to SLive. Contributed by Plamen Jeliazkov.
This commit is contained in:
parent
10da1bfce2
commit
32d215dc89
|
@ -6,6 +6,10 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
|
||||
|
||||
MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-6149. Document override log4j.properties in MR job.
|
||||
|
@ -42,8 +46,6 @@ Release 2.7.0 - UNRELEASED
|
|||
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
|
||||
(aajisaka)
|
||||
|
||||
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
|
||||
|
||||
MAPREDUCE-6253. Update use of Iterator to Iterable. (Ray Chiang via devaraj)
|
||||
|
||||
MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl.
|
||||
|
|
|
@ -144,6 +144,7 @@ class ArgumentParser {
|
|||
cliopt.addOption(ConfigOption.DURATION);
|
||||
cliopt.addOption(ConfigOption.EXIT_ON_ERROR);
|
||||
cliopt.addOption(ConfigOption.SLEEP_TIME);
|
||||
cliopt.addOption(ConfigOption.TRUNCATE_WAIT);
|
||||
cliopt.addOption(ConfigOption.FILES);
|
||||
cliopt.addOption(ConfigOption.DIR_SIZE);
|
||||
cliopt.addOption(ConfigOption.BASE_DIR);
|
||||
|
@ -167,6 +168,7 @@ class ArgumentParser {
|
|||
cliopt.addOption(ConfigOption.READ_SIZE);
|
||||
cliopt.addOption(ConfigOption.WRITE_SIZE);
|
||||
cliopt.addOption(ConfigOption.APPEND_SIZE);
|
||||
cliopt.addOption(ConfigOption.TRUNCATE_SIZE);
|
||||
cliopt.addOption(ConfigOption.RANDOM_SEED);
|
||||
cliopt.addOption(ConfigOption.QUEUE_NAME);
|
||||
cliopt.addOption(ConfigOption.HELP);
|
||||
|
|
|
@ -130,6 +130,32 @@ class ConfigExtractor {
|
|||
return Boolean.parseBoolean(val);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the mapper or reducer should wait for truncate recovery
|
||||
*/
|
||||
boolean shouldWaitOnTruncate() {
|
||||
return shouldWaitOnTruncate(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param primary
|
||||
* primary the initial string to be used for the value of this
|
||||
* configuration option (if not provided then config and then the
|
||||
* default are used)
|
||||
*
|
||||
* @return whether the mapper or reducer should wait for truncate recovery
|
||||
*/
|
||||
boolean shouldWaitOnTruncate(String primary) {
|
||||
String val = primary;
|
||||
if (val == null) {
|
||||
val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption());
|
||||
}
|
||||
if (val == null) {
|
||||
return ConfigOption.EXIT_ON_ERROR.getDefault();
|
||||
}
|
||||
return Boolean.parseBoolean(val);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of reducers to use
|
||||
*/
|
||||
|
@ -533,6 +559,24 @@ class ConfigExtractor {
|
|||
return getAppendSize(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param primary
|
||||
* the initial string to be used for the value of this configuration
|
||||
* option (if not provided then config and then the default are used)
|
||||
* @return the truncate byte size range (or null if none)
|
||||
*/
|
||||
Range<Long> getTruncateSize(String primary) {
|
||||
return getMinMaxBytes(ConfigOption.TRUNCATE_SIZE, primary);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the truncate byte size range (or null if none) using config and
|
||||
* default for lookup
|
||||
*/
|
||||
Range<Long> getTruncateSize() {
|
||||
return getTruncateSize(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param primary
|
||||
* the initial string to be used for the value of this configuration
|
||||
|
@ -599,6 +643,21 @@ class ConfigExtractor {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the truncate range should use the block size range
|
||||
*
|
||||
* @return true|false
|
||||
*/
|
||||
boolean shouldTruncateUseBlockSize() {
|
||||
Range<Long> truncateRange = getTruncateSize();
|
||||
if (truncateRange == null
|
||||
|| (truncateRange.getLower() == truncateRange.getUpper()
|
||||
&& (truncateRange.getUpper() == Long.MAX_VALUE))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the read range should use the entire file
|
||||
*
|
||||
|
|
|
@ -282,6 +282,18 @@ class ConfigMerger {
|
|||
"Error extracting & merging exit on error value", e);
|
||||
}
|
||||
}
|
||||
// overwrite the truncate wait setting
|
||||
{
|
||||
try {
|
||||
boolean waitOnTruncate = extractor.shouldWaitOnTruncate(opts
|
||||
.getValue(ConfigOption.TRUNCATE_WAIT.getOpt()));
|
||||
base.setBoolean(ConfigOption.TRUNCATE_WAIT.getCfgOption(),
|
||||
waitOnTruncate);
|
||||
} catch (Exception e) {
|
||||
throw new ConfigException(
|
||||
"Error extracting & merging wait on truncate value", e);
|
||||
}
|
||||
}
|
||||
// verify and set file limit and ensure > 0
|
||||
{
|
||||
Integer fileAm = null;
|
||||
|
@ -553,6 +565,29 @@ class ConfigMerger {
|
|||
.set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString());
|
||||
}
|
||||
}
|
||||
// set the truncate size range
|
||||
{
|
||||
Range<Long> truncateSize = null;
|
||||
try {
|
||||
truncateSize = extractor.getTruncateSize(opts
|
||||
.getValue(ConfigOption.TRUNCATE_SIZE.getOpt()));
|
||||
} catch (Exception e) {
|
||||
throw new ConfigException(
|
||||
"Error extracting & merging truncate size range", e);
|
||||
}
|
||||
if (truncateSize != null) {
|
||||
if (truncateSize.getLower() > truncateSize.getUpper()) {
|
||||
throw new ConfigException(
|
||||
"Truncate size minimum is greater than its maximum");
|
||||
}
|
||||
if (truncateSize.getLower() < 0) {
|
||||
throw new ConfigException(
|
||||
"Truncate size minimum must be greater than or equal to zero");
|
||||
}
|
||||
base
|
||||
.set(ConfigOption.TRUNCATE_SIZE.getCfgOption(), truncateSize.toString());
|
||||
}
|
||||
}
|
||||
// set the seed
|
||||
{
|
||||
Long seed = null;
|
||||
|
|
|
@ -97,6 +97,15 @@ class ConfigOption<T> extends Option {
|
|||
"Min,max for size to append (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
|
||||
+ ".op.append.size", null);
|
||||
|
||||
static final ConfigOption<Boolean> TRUNCATE_WAIT = new ConfigOption<Boolean>(
|
||||
"truncateWait", true, "Should wait for truncate recovery", SLIVE_PREFIX
|
||||
+ ".op.truncate.wait", true);
|
||||
|
||||
static final ConfigOption<Long> TRUNCATE_SIZE = new ConfigOption<Long>(
|
||||
"truncateSize", true,
|
||||
"Min,max for size to truncate (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
|
||||
+ ".op.truncate.size", null);
|
||||
|
||||
static final ConfigOption<Long> RANDOM_SEED = new ConfigOption<Long>(
|
||||
"seed", true, "Random number seed", SLIVE_PREFIX + ".seed", null);
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ class Constants {
|
|||
* Allowed operation types
|
||||
*/
|
||||
enum OperationType {
|
||||
READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE;
|
||||
READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE, TRUNCATE;
|
||||
String lowerName() {
|
||||
return this.name().toLowerCase();
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ class Constants {
|
|||
|
||||
// program info
|
||||
static final String PROG_NAME = SliveTest.class.getSimpleName();
|
||||
static final String PROG_VERSION = "0.0.2";
|
||||
static final String PROG_VERSION = "0.1.0";
|
||||
|
||||
// useful constants
|
||||
static final int MEGABYTES = 1048576;
|
||||
|
|
|
@ -75,6 +75,9 @@ class OperationFactory {
|
|||
case CREATE:
|
||||
op = new CreateOp(this.config, rnd);
|
||||
break;
|
||||
case TRUNCATE:
|
||||
op = new TruncateOp(this.config, rnd);
|
||||
break;
|
||||
}
|
||||
typedOperations.put(type, op);
|
||||
return op;
|
||||
|
|
|
@ -136,6 +136,8 @@ public class TestSlive {
|
|||
args.add("10");
|
||||
args.add("-" + ConfigOption.FILES.getOpt());
|
||||
args.add("10");
|
||||
args.add("-" + ConfigOption.TRUNCATE_SIZE.getOpt());
|
||||
args.add("0,1M");
|
||||
}
|
||||
return args.toArray(new String[args.size()]);
|
||||
}
|
||||
|
@ -237,6 +239,9 @@ public class TestSlive {
|
|||
Range<Long> wRange = extractor.getWriteSize();
|
||||
assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1);
|
||||
assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2);
|
||||
Range<Long> trRange = extractor.getTruncateSize();
|
||||
assertEquals(trRange.getLower().intValue(), 0);
|
||||
assertEquals(trRange.getUpper().intValue(), Constants.MEGABYTES * 1);
|
||||
Range<Long> bRange = extractor.getBlockSize();
|
||||
assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1);
|
||||
assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2);
|
||||
|
@ -534,4 +539,26 @@ public class TestSlive {
|
|||
};
|
||||
runOperationOk(extractor, aop, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateOp() throws Exception {
|
||||
// setup a valid config
|
||||
ConfigExtractor extractor = getTestConfig(false);
|
||||
// ensure file created before append
|
||||
final Path fn = new Path(getTestFile().getCanonicalPath());
|
||||
CreateOp op = new CreateOp(extractor, rnd) {
|
||||
protected Path getCreateFile() {
|
||||
return fn;
|
||||
}
|
||||
};
|
||||
runOperationOk(extractor, op, true);
|
||||
// local file system (ChecksumFileSystem) currently doesn't support truncate -
|
||||
// but we'll leave this test here anyways but can't check the results..
|
||||
TruncateOp top = new TruncateOp(extractor, rnd) {
|
||||
protected Path getTruncateFile() {
|
||||
return fn;
|
||||
}
|
||||
};
|
||||
runOperationOk(extractor, top, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.slive;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
|
||||
|
||||
/**
|
||||
* Operation which selects a random file and truncates a random amount of bytes
|
||||
* (selected from the configuration for truncate size) from that file,
|
||||
* if it exists.
|
||||
*
|
||||
* This operation will capture statistics on success for bytes written, time
|
||||
* taken (milliseconds), and success count and on failure it will capture the
|
||||
* number of failures and the time taken (milliseconds) to fail.
|
||||
*/
|
||||
class TruncateOp extends Operation {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TruncateOp.class);
|
||||
|
||||
TruncateOp(ConfigExtractor cfg, Random rnd) {
|
||||
super(TruncateOp.class.getSimpleName(), cfg, rnd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the file to truncate from
|
||||
*
|
||||
* @return Path
|
||||
*/
|
||||
protected Path getTruncateFile() {
|
||||
Path fn = getFinder().getFile();
|
||||
return fn;
|
||||
}
|
||||
|
||||
@Override // Operation
|
||||
List<OperationOutput> run(FileSystem fs) {
|
||||
List<OperationOutput> out = super.run(fs);
|
||||
try {
|
||||
Path fn = getTruncateFile();
|
||||
boolean waitOnTruncate = getConfig().shouldWaitOnTruncate();
|
||||
long currentSize = fs.getFileStatus(fn).getLen();
|
||||
// determine file status for file length requirement
|
||||
// to know if should fill in partial bytes
|
||||
Range<Long> truncateSizeRange = getConfig().getTruncateSize();
|
||||
if (getConfig().shouldTruncateUseBlockSize()) {
|
||||
truncateSizeRange = getConfig().getBlockSize();
|
||||
}
|
||||
long truncateSize = Math.max(0L,
|
||||
currentSize - Range.betweenPositive(getRandom(), truncateSizeRange));
|
||||
long timeTaken = 0;
|
||||
LOG.info("Attempting to truncate file at " + fn + " to size "
|
||||
+ Helper.toByteInfo(truncateSize));
|
||||
{
|
||||
// truncate
|
||||
long startTime = Timer.now();
|
||||
boolean completed = fs.truncate(fn, truncateSize);
|
||||
if(!completed && waitOnTruncate)
|
||||
waitForRecovery(fs, fn, truncateSize);
|
||||
timeTaken += Timer.elapsed(startTime);
|
||||
}
|
||||
out.add(new OperationOutput(OutputType.LONG, getType(),
|
||||
ReportWriter.BYTES_WRITTEN, 0));
|
||||
out.add(new OperationOutput(OutputType.LONG, getType(),
|
||||
ReportWriter.OK_TIME_TAKEN, timeTaken));
|
||||
out.add(new OperationOutput(OutputType.LONG, getType(),
|
||||
ReportWriter.SUCCESSES, 1L));
|
||||
LOG.info("Truncate file " + fn + " to " + Helper.toByteInfo(truncateSize)
|
||||
+ " in " + timeTaken + " milliseconds");
|
||||
} catch (FileNotFoundException e) {
|
||||
out.add(new OperationOutput(OutputType.LONG, getType(),
|
||||
ReportWriter.NOT_FOUND, 1L));
|
||||
LOG.warn("Error with truncating", e);
|
||||
} catch (IOException e) {
|
||||
out.add(new OperationOutput(OutputType.LONG, getType(),
|
||||
ReportWriter.FAILURES, 1L));
|
||||
LOG.warn("Error with truncating", e);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private void waitForRecovery(FileSystem fs, Path fn, long newLength)
|
||||
throws IOException {
|
||||
LOG.info("Waiting on truncate file recovery for " + fn);
|
||||
for(;;) {
|
||||
FileStatus stat = fs.getFileStatus(fn);
|
||||
if(stat.getLen() == newLength) break;
|
||||
try {Thread.sleep(1000);} catch(InterruptedException ignored) {}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue