HDFS-2261. AOP unit tests are not getting compiled or run. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-11-09 15:25:19 -08:00
parent 6bca317157
commit c91bd3aabb
28 changed files with 2 additions and 3706 deletions

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import org.apache.hadoop.conf.Configuration;
/**
* This class wraps the logic around fault injection configuration file
* Default file is expected to be found in src/test/fi-site.xml
* This default file should be copied by JUnit Ant's tasks to
* build/test/extraconf folder before tests are ran
* An alternative location can be set through
* -Dfi.config=<file_name>
*/
public class FiConfig {
private static final String CONFIG_PARAMETER = ProbabilityModel.FPROB_NAME + "config";
private static final String DEFAULT_CONFIG = "fi-site.xml";
private static Configuration conf;
static {
if (conf == null) {
conf = new Configuration(false);
String configName = System.getProperty(CONFIG_PARAMETER, DEFAULT_CONFIG);
conf.addResource(configName);
}
}
/**
* Method provides access to local Configuration
*
* @return Configuration initialized with fault injection's parameters
*/
public static Configuration getConfig() {
return conf;
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* This class is responsible for the decision of when a fault
* has to be triggered within a class of Hadoop
*
* Default probability of injection is set to 0%. To change it
* one can set the sys. prop. -Dfi.*=<new probability level>
* Another way to do so is to set this level through FI config file,
* located under src/test/fi-site.conf
*
* To change the level one has to specify the following sys,prop.:
* -Dfi.<name of fault location>=<probability level> in the runtime
* Probability level is specified by a float between 0.0 and 1.0
*
* <name of fault location> might be represented by a short classname
* or otherwise. This decision is left up to the discretion of aspects
* developer, but has to be consistent through the code
*/
public class ProbabilityModel {
private static Random generator = new Random();
private static final Log LOG = LogFactory.getLog(ProbabilityModel.class);
static final String FPROB_NAME = "fi.";
private static final String ALL_PROBABILITIES = FPROB_NAME + "*";
private static final float DEFAULT_PROB = 0.00f; //Default probability is 0%
private static final float MAX_PROB = 1.00f; // Max probability is 100%
private static Configuration conf = FiConfig.getConfig();
static {
// Set new default probability if specified through a system.property
// If neither is specified set default probability to DEFAULT_PROB
conf.set(ALL_PROBABILITIES,
System.getProperty(ALL_PROBABILITIES,
conf.get(ALL_PROBABILITIES, Float.toString(DEFAULT_PROB))));
LOG.info(ALL_PROBABILITIES + "=" + conf.get(ALL_PROBABILITIES));
}
/**
* Simplistic method to check if we have reached the point of injection
* @param klassName is the name of the probability level to check.
* If a configuration has been set for "fi.myClass" then you can check if the
* inject criteria has been reached by calling this method with "myClass"
* string as its parameter
* @return true if the probability threshold has been reached; false otherwise
*/
public static boolean injectCriteria(String klassName) {
boolean trigger = false;
if (generator.nextFloat() < getProbability(klassName)) {
trigger = true;
}
return trigger;
}
/**
* This primitive checks for arbitrary set of desired probability. If the
* level hasn't been set method will return default setting.
* The probability expected to be set as an float between 0.0 and 1.0
* @param klass is the name of the resource
* @return float representation of configured probability level of
* the requested resource or default value if hasn't been set
*/
protected static float getProbability(final String klass) {
String newProbName = FPROB_NAME + klass;
String newValue = System.getProperty(newProbName, conf.get(ALL_PROBABILITIES));
if (newValue != null && !newValue.equals(conf.get(newProbName)))
conf.set(newProbName, newValue);
float ret = conf.getFloat(newProbName,
conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB));
if(LOG.isDebugEnabled()) {
LOG.debug("Request for " + newProbName + " returns=" + ret);
}
// Make sure that probability level is valid.
if (ret < DEFAULT_PROB || ret > MAX_PROB) {
LOG.info("Probability level is incorrect. Default value is set");
ret = conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB);
}
return ret;
}
}

View File

@ -791,6 +791,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9398. Make ByteArraryManager log message in one-line format.
(Mingliang Liu via szetszwo)
HDFS-2261. AOP unit tests are not getting compiled or run. (wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -375,7 +375,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<exclude>src/test/resources/data*</exclude>
<exclude>src/test/resources/editsStored*</exclude>
<exclude>src/test/resources/empty-file</exclude>
<exclude>src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataXceiverAspects.aj</exclude>
<exclude>src/main/webapps/datanode/robots.txt</exclude>
<exclude>src/main/docs/releasenotes.html</exclude>
<exclude>src/contrib/**</exclude>

View File

@ -1,485 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fi.FiTestUtil.Action;
import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
import org.apache.hadoop.fi.FiTestUtil.CountdownConstraint;
import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
* Utilities for DataTransferProtocol related tests,
* e.g. TestFiDataTransferProtocol.
*/
public class DataTransferTestUtil {
protected static PipelineTest thepipelinetest;
/** initialize pipeline test */
public static PipelineTest initTest() {
return thepipelinetest = new DataTransferTest();
}
/** get the pipeline test object */
public static PipelineTest getPipelineTest() {
return thepipelinetest;
}
/** get the pipeline test object cast to DataTransferTest */
public static DataTransferTest getDataTransferTest() {
return (DataTransferTest)getPipelineTest();
}
/**
* The DataTransferTest class includes a pipeline
* and some actions.
*/
public static class DataTransferTest implements PipelineTest {
private final List<Pipeline> pipelines = new ArrayList<Pipeline>();
private volatile boolean isSuccess = false;
/** Simulate action for the receiverOpWriteBlock pointcut */
public final ActionContainer<DatanodeID, IOException> fiReceiverOpWriteBlock
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the callReceivePacket pointcut */
public final ActionContainer<DatanodeID, IOException> fiCallReceivePacket
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the callWritePacketToDisk pointcut */
public final ActionContainer<DatanodeID, IOException> fiCallWritePacketToDisk
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the statusRead pointcut */
public final ActionContainer<DatanodeID, IOException> fiStatusRead
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the afterDownstreamStatusRead pointcut */
public final ActionContainer<DatanodeID, IOException> fiAfterDownstreamStatusRead
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineAck pointcut */
public final ActionContainer<DatanodeID, IOException> fiPipelineAck
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineClose pointcut */
public final ActionContainer<DatanodeID, IOException> fiPipelineClose
= new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the blockFileClose pointcut */
public final ActionContainer<DatanodeID, IOException> fiBlockFileClose
= new ActionContainer<DatanodeID, IOException>();
/** Verification action for the pipelineInitNonAppend pointcut */
public final ActionContainer<Integer, RuntimeException> fiPipelineInitErrorNonAppend
= new ActionContainer<Integer, RuntimeException>();
/** Verification action for the pipelineErrorAfterInit pointcut */
public final ActionContainer<Integer, RuntimeException> fiPipelineErrorAfterInit
= new ActionContainer<Integer, RuntimeException>();
/** Get test status */
public boolean isSuccess() {
return this.isSuccess;
}
/** Set test status */
public void markSuccess() {
this.isSuccess = true;
}
/** Initialize the pipeline. */
@Override
public synchronized Pipeline initPipeline(LocatedBlock lb) {
final Pipeline pl = new Pipeline(lb);
if (pipelines.contains(pl)) {
throw new IllegalStateException("thepipeline != null");
}
pipelines.add(pl);
return pl;
}
/** Return the pipeline for the datanode. */
@Override
public synchronized Pipeline getPipelineForDatanode(DatanodeID id) {
for (Pipeline p : pipelines) {
if (p.contains(id)){
return p;
}
}
FiTestUtil.LOG.info("FI: pipeline not found; id=" + id
+ ", pipelines=" + pipelines);
return null;
}
/**
* Is the test not yet success
* and the last pipeline contains the given datanode?
*/
private synchronized boolean isNotSuccessAndLastPipelineContains(
int index, DatanodeID id) {
if (isSuccess()) {
return false;
}
final int n = pipelines.size();
return n == 0? false: pipelines.get(n-1).contains(index, id);
}
}
/** Action for DataNode */
public static abstract class DataNodeAction implements
Action<DatanodeID, IOException> {
/** The name of the test */
final String currentTest;
/** The index of the datanode */
final int index;
/**
* @param currentTest The name of the test
* @param index The index of the datanode
*/
protected DataNodeAction(String currentTest, int index) {
this.currentTest = currentTest;
this.index = index;
}
/** {@inheritDoc} */
public String toString() {
return getClass().getSimpleName() + ":" + currentTest
+ ", index=" + index;
}
/** return a String with this object and the datanodeID. */
String toString(DatanodeID datanodeID) {
return "FI: " + this + ", datanode="
+ datanodeID.getName();
}
}
/** An action to set a marker if the DatanodeID is matched. */
public static class DatanodeMarkingAction extends DataNodeAction {
private final MarkerConstraint marker;
/** Construct an object. */
public DatanodeMarkingAction(String currentTest, int index,
MarkerConstraint marker) {
super(currentTest, index);
this.marker = marker;
}
/** Set the marker if the DatanodeID is matched. */
@Override
public void run(DatanodeID datanodeid) throws IOException {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, datanodeid)) {
marker.mark();
}
}
/** {@inheritDoc} */
public String toString() {
return super.toString() + ", " + marker;
}
}
/** Throws OutOfMemoryError. */
public static class OomAction extends DataNodeAction {
/** Create an action for datanode i in the pipeline. */
public OomAction(String currentTest, int i) {
super(currentTest, i);
}
@Override
public void run(DatanodeID id) {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)) {
final String s = toString(id);
FiTestUtil.LOG.info(s);
throw new OutOfMemoryError(s);
}
}
}
/** Throws OutOfMemoryError if the count is zero. */
public static class CountdownOomAction extends OomAction {
private final CountdownConstraint countdown;
/** Create an action for datanode i in the pipeline with count down. */
public CountdownOomAction(String currentTest, int i, int count) {
super(currentTest, i);
countdown = new CountdownConstraint(count);
}
@Override
public void run(DatanodeID id) {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)
&& countdown.isSatisfied()) {
final String s = toString(id);
FiTestUtil.LOG.info(s);
throw new OutOfMemoryError(s);
}
}
}
/** Throws DiskOutOfSpaceException. */
public static class DoosAction extends DataNodeAction {
/** Create an action for datanode i in the pipeline. */
public DoosAction(String currentTest, int i) {
super(currentTest, i);
}
@Override
public void run(DatanodeID id) throws DiskOutOfSpaceException {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)) {
final String s = toString(id);
FiTestUtil.LOG.info(s);
throw new DiskOutOfSpaceException(s);
}
}
}
/** Throws an IOException. */
public static class IoeAction extends DataNodeAction {
private final String error;
/** Create an action for datanode i in the pipeline. */
public IoeAction(String currentTest, int i, String error) {
super(currentTest, i);
this.error = error;
}
@Override
public void run(DatanodeID id) throws IOException {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)) {
final String s = toString(id);
FiTestUtil.LOG.info(s);
throw new IOException(s);
}
}
@Override
public String toString() {
return error + " " + super.toString();
}
}
/** Throws DiskOutOfSpaceException if the count is zero. */
public static class CountdownDoosAction extends DoosAction {
private final CountdownConstraint countdown;
/** Create an action for datanode i in the pipeline with count down. */
public CountdownDoosAction(String currentTest, int i, int count) {
super(currentTest, i);
countdown = new CountdownConstraint(count);
}
@Override
public void run(DatanodeID id) throws DiskOutOfSpaceException {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)
&& countdown.isSatisfied()) {
final String s = toString(id);
FiTestUtil.LOG.info(s);
throw new DiskOutOfSpaceException(s);
}
}
}
/**
* Sleep some period of time so that it slows down the datanode
* or sleep forever so that datanode becomes not responding.
*/
public static class SleepAction extends DataNodeAction {
/** In milliseconds;
* must have (0 <= minDuration < maxDuration) or (maxDuration <= 0).
*/
final long minDuration;
/** In milliseconds; maxDuration <= 0 means sleeping forever.*/
final long maxDuration;
/**
* Create an action for datanode i in the pipeline.
* @param duration In milliseconds, duration <= 0 means sleeping forever.
*/
public SleepAction(String currentTest, int i, long duration) {
this(currentTest, i, duration, duration <= 0? duration: duration+1);
}
/**
* Create an action for datanode i in the pipeline.
* @param minDuration minimum sleep time
* @param maxDuration maximum sleep time
*/
public SleepAction(String currentTest, int i,
long minDuration, long maxDuration) {
super(currentTest, i);
if (maxDuration > 0) {
if (minDuration < 0) {
throw new IllegalArgumentException("minDuration = " + minDuration
+ " < 0 but maxDuration = " + maxDuration + " > 0");
}
if (minDuration >= maxDuration) {
throw new IllegalArgumentException(
minDuration + " = minDuration >= maxDuration = " + maxDuration);
}
}
this.minDuration = minDuration;
this.maxDuration = maxDuration;
}
@Override
public void run(DatanodeID id) {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)) {
FiTestUtil.LOG.info(toString(id));
if (maxDuration <= 0) {
for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
} else {
FiTestUtil.sleep(minDuration, maxDuration);
}
}
}
/** {@inheritDoc} */
@Override
public String toString() {
return super.toString() + ", duration="
+ (maxDuration <= 0? "infinity": "[" + minDuration + ", " + maxDuration + ")");
}
}
/**
* When the count is zero,
* sleep some period of time so that it slows down the datanode
* or sleep forever so that datanode becomes not responding.
*/
public static class CountdownSleepAction extends SleepAction {
private final CountdownConstraint countdown;
/**
* Create an action for datanode i in the pipeline.
* @param duration In milliseconds, duration <= 0 means sleeping forever.
*/
public CountdownSleepAction(String currentTest, int i,
long duration, int count) {
this(currentTest, i, duration, duration+1, count);
}
/** Create an action for datanode i in the pipeline with count down. */
public CountdownSleepAction(String currentTest, int i,
long minDuration, long maxDuration, int count) {
super(currentTest, i, minDuration, maxDuration);
countdown = new CountdownConstraint(count);
}
@Override
public void run(DatanodeID id) {
final DataTransferTest test = getDataTransferTest();
if (test.isNotSuccessAndLastPipelineContains(index, id)
&& countdown.isSatisfied()) {
final String s = toString(id) + ", duration = ["
+ minDuration + "," + maxDuration + ")";
FiTestUtil.LOG.info(s);
if (maxDuration <= 1) {
for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
} else {
FiTestUtil.sleep(minDuration, maxDuration);
}
}
}
}
/** Action for pipeline error verification */
public static class VerificationAction implements
Action<Integer, RuntimeException> {
/** The name of the test */
final String currentTest;
/** The error index of the datanode */
final int errorIndex;
/**
* Create a verification action for errors at datanode i in the pipeline.
*
* @param currentTest The name of the test
* @param i The error index of the datanode
*/
public VerificationAction(String currentTest, int i) {
this.currentTest = currentTest;
this.errorIndex = i;
}
/** {@inheritDoc} */
public String toString() {
return currentTest + ", errorIndex=" + errorIndex;
}
@Override
public void run(Integer i) {
if (i == errorIndex) {
FiTestUtil.LOG.info(this + ", successfully verified.");
getDataTransferTest().markSuccess();
}
}
}
/**
* Create a OomAction with a CountdownConstraint
* so that it throws OutOfMemoryError if the count is zero.
*/
public static ConstraintSatisfactionAction<DatanodeID, IOException>
createCountdownOomAction(
String currentTest, int i, int count) {
return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new OomAction(currentTest, i), new CountdownConstraint(count));
}
/**
* Create a DoosAction with a CountdownConstraint
* so that it throws DiskOutOfSpaceException if the count is zero.
*/
public static ConstraintSatisfactionAction<DatanodeID, IOException>
createCountdownDoosAction(
String currentTest, int i, int count) {
return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new DoosAction(currentTest, i), new CountdownConstraint(count));
}
/**
* Create a SleepAction with a CountdownConstraint
* for datanode i in the pipeline.
* When the count is zero,
* sleep some period of time so that it slows down the datanode
* or sleep forever so the that datanode becomes not responding.
*/
public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long minDuration, long maxDuration, int count) {
return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new SleepAction(currentTest, i, minDuration, maxDuration),
new CountdownConstraint(count));
}
/**
* Same as
* createCountdownSleepAction(currentTest, i, duration, duration+1, count).
*/
public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long duration, int count) {
return createCountdownSleepAction(currentTest, i, duration, duration+1,
count);
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
/**
* This class wraps the logic around fault injection configuration file
* Default file is expected to be found in src/test/fi-site.xml
* This default file should be copied by JUnit Ant's tasks to
* build/test/extraconf folder before tests are ran
* An alternative location can be set through
* -Dfi.config=<file_name>
*/
public class FiConfig {
private static final String CONFIG_PARAMETER = ProbabilityModel.FPROB_NAME + "config";
private static final String DEFAULT_CONFIG = "fi-site.xml";
private static Configuration conf;
static {
init();
}
protected static void init () {
if (conf == null) {
conf = new HdfsConfiguration(false);
String configName = System.getProperty(CONFIG_PARAMETER, DEFAULT_CONFIG);
conf.addResource(configName);
}
}
/**
* Method provides access to local Configuration
*
* @return Configuration initialized with fault injection's parameters
*/
public static Configuration getConfig() {
return conf;
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import java.io.IOException;
import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/** Helper methods and actions for hflush() fault injection tests */
public class FiHFlushTestUtil extends DataTransferTestUtil {
/** {@inheritDoc} */
public static PipelineTest initTest() {
return thepipelinetest = new HFlushTest();
}
/** Disk error action for fault injection tests */
public static class DerrAction extends DataTransferTestUtil.DataNodeAction {
/**
* @param currentTest The name of the test
* @param index The index of the datanode
*/
public DerrAction(String currentTest, int index) {
super(currentTest, index);
}
/** {@inheritDoc} */
public void run(DatanodeID id) throws IOException {
final Pipeline p = getPipelineTest().getPipelineForDatanode(id);
if (p == null) {
return;
}
if (p.contains(index, id)) {
final String s = super.toString(id);
FiTestUtil.LOG.info(s);
throw new DiskErrorException(s);
}
}
}
/** Class adds new type of action */
public static class HFlushTest extends DataTransferTest {
public final ActionContainer<DatanodeID, IOException> fiCallHFlush =
new ActionContainer<DatanodeID, IOException>();
public final ActionContainer<Integer, RuntimeException> fiErrorOnCallHFlush =
new ActionContainer<Integer, RuntimeException>();
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** Test Utilities */
public class FiTestUtil {
/** Logging */
public static final Log LOG = LogFactory.getLog(FiTestUtil.class);
/** Random source */
public static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
protected Random initialValue() {
final Random r = new Random();
final long seed = r.nextLong();
LOG.info(Thread.currentThread() + ": seed=" + seed);
r.setSeed(seed);
return r;
}
};
/**
* Return a random integer uniformly distributed over the interval [min,max).
*/
public static int nextRandomInt(final int min, final int max) {
final int d = max - min;
if (d <= 0) {
throw new IllegalArgumentException("d <= 0, min=" + min + ", max=" + max);
}
return d == 1? min: min + RANDOM.get().nextInt(d);
}
/**
* Return a random integer, with type long,
* uniformly distributed over the interval [min,max).
* Assume max - min <= Integer.MAX_VALUE.
*/
public static long nextRandomLong(final long min, final long max) {
final long d = max - min;
if (d <= 0 || d > Integer.MAX_VALUE) {
throw new IllegalArgumentException(
"d <= 0 || d > Integer.MAX_VALUE, min=" + min + ", max=" + max);
}
return d == 1? min: min + RANDOM.get().nextInt((int)d);
}
/** Return the method name of the callee. */
public static String getMethodName() {
final StackTraceElement[] s = Thread.currentThread().getStackTrace();
return s[s.length > 2? 2: s.length - 1].getMethodName();
}
/**
* Sleep.
* @return true if sleep exits normally; false if InterruptedException.
*/
public static boolean sleep(long ms) {
LOG.info("Sleep " + ms + " ms");
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
LOG.info("Sleep is interrupted", e);
return false;
}
return true;
}
/**
* Sleep a random number of milliseconds over the interval [min, max).
* If there is an InterruptedException, re-throw it as a RuntimeException.
*/
public static void sleep(final long min, final long max) {
final long n = nextRandomLong(min, max);
LOG.info(Thread.currentThread().getName() + " sleeps for " + n +"ms");
if (n > 0) {
sleep(n);
}
}
/** Action interface */
public static interface Action<T, E extends Exception> {
/** Run the action with the parameter. */
public void run(T parameter) throws E;
}
/** An ActionContainer contains at most one action. */
public static class ActionContainer<T, E extends Exception> {
private List<Action<T, E>> actionList = new ArrayList<Action<T, E>>();
/** Create an empty container. */
public ActionContainer() {}
/** Set action. */
public void set(Action<T, E> a) {actionList.add(a);}
/** Run the action if it exists. */
public void run(T obj) throws E {
for (Action<T, E> action : actionList) {
action.run(obj);
}
}
}
/** Constraint interface */
public static interface Constraint {
/** Is this constraint satisfied? */
public boolean isSatisfied();
}
/** Counting down, the constraint is satisfied if the count is one. */
public static class CountdownConstraint implements Constraint {
private int count;
/** Initialize the count. */
public CountdownConstraint(int count) {
if (count < 1) {
throw new IllegalArgumentException(count + " = count < 1");
}
this.count = count;
}
/** Counting down, the constraint is satisfied if the count is zero. */
public boolean isSatisfied() {
if (count > 1) {
count--;
return false;
}
return true;
}
}
/** An action is fired if all the constraints are satisfied. */
public static class ConstraintSatisfactionAction<T, E extends Exception>
implements Action<T, E> {
private final Action<T, E> action;
private final Constraint[] constraints;
/** Constructor */
public ConstraintSatisfactionAction(
Action<T, E> action, Constraint... constraints) {
this.action = action;
this.constraints = constraints;
}
/**
* Fire the action if all the constraints are satisfied.
* Short-circuit-and is used.
*/
@Override
public final void run(T parameter) throws E {
for(Constraint c : constraints) {
if (!c.isSatisfied()) {
return;
}
}
//all constraints are satisfied, fire the action
action.run(parameter);
}
}
/** A MarkerConstraint is satisfied if it is marked. */
public static class MarkerConstraint implements Constraint {
private final String name;
private boolean marked = false;
/** Construct an object. */
public MarkerConstraint(String name) {
this.name = name;
}
/** Set marker to be marked. */
public void mark() {
marked = true;
LOG.info("Marking this " + this);
}
/** Is the marker marked? */
@Override
public boolean isSatisfied() {
return marked;
}
/** {@inheritDoc} */
public String toString() {
return getClass().getSimpleName() + "[" + name + ": " + marked + "]";
}
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import java.util.List;
import java.util.ArrayList;
public class Pipeline {
private final List<String> datanodes = new ArrayList<String>();
Pipeline(LocatedBlock lb) {
for(DatanodeInfo d : lb.getLocations()) {
datanodes.add(d.getName());
}
}
/** Does the pipeline contains d? */
public boolean contains(DatanodeID d) {
return datanodes.contains(d.getName());
}
/** Does the pipeline contains d at the n th position? */
public boolean contains(int n, DatanodeID d) {
return d.getName().equals(datanodes.get(n));
}
@Override
public String toString() {
return getClass().getSimpleName() + datanodes;
}
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
/** A pipeline contains a list of datanodes. */
public interface PipelineTest {
public Pipeline initPipeline(LocatedBlock lb);
public Pipeline getPipelineForDatanode(DatanodeID id);
}

View File

@ -1,107 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fi;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* This class is responsible for the decision of when a fault
* has to be triggered within a class of Hadoop
*
* Default probability of injection is set to 0%. To change it
* one can set the sys. prop. -Dfi.*=<new probability level>
* Another way to do so is to set this level through FI config file,
* located under src/test/fi-site.conf
*
* To change the level one has to specify the following sys,prop.:
* -Dfi.<name of fault location>=<probability level> in the runtime
* Probability level is specified by a float between 0.0 and 1.0
*
* <name of fault location> might be represented by a short classname
* or otherwise. This decision is left up to the discretion of aspects
* developer, but has to be consistent through the code
*/
public class ProbabilityModel {
private static Random generator = new Random();
private static final Log LOG = LogFactory.getLog(ProbabilityModel.class);
static final String FPROB_NAME = "fi.";
private static final String ALL_PROBABILITIES = FPROB_NAME + "*";
private static final float DEFAULT_PROB = 0.00f; //Default probability is 0%
private static final float MAX_PROB = 1.00f; // Max probability is 100%
private static Configuration conf = FiConfig.getConfig();
static {
// Set new default probability if specified through a system.property
// If neither is specified set default probability to DEFAULT_PROB
conf.set(ALL_PROBABILITIES,
System.getProperty(ALL_PROBABILITIES,
conf.get(ALL_PROBABILITIES, Float.toString(DEFAULT_PROB))));
LOG.info(ALL_PROBABILITIES + "=" + conf.get(ALL_PROBABILITIES));
}
/**
* Simplistic method to check if we have reached the point of injection
* @param klassName is the name of the probability level to check.
* If a configuration has been set for "fi.myClass" then you can check if the
* inject criteria has been reached by calling this method with "myClass"
* string as its parameter
* @return true if the probability threshold has been reached; false otherwise
*/
public static boolean injectCriteria(String klassName) {
boolean trigger = false;
// TODO fix this: make it more sophisticated!!!
if (generator.nextFloat() < getProbability(klassName)) {
trigger = true;
}
return trigger;
}
/**
* This primitive checks for arbitrary set of desired probability. If the
* level hasn't been set method will return default setting.
* The probability expected to be set as an float between 0.0 and 1.0
* @param klass is the name of the resource
* @return float representation of configured probability level of
* the requested resource or default value if hasn't been set
*/
protected static float getProbability(final String klass) {
String newProbName = FPROB_NAME + klass;
String newValue = System.getProperty(newProbName, conf.get(ALL_PROBABILITIES));
if (newValue != null && !newValue.equals(conf.get(newProbName)))
conf.set(newProbName, newValue);
float ret = conf.getFloat(newProbName,
conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB));
if(LOG.isDebugEnabled()) {
LOG.debug("Request for " + newProbName + " returns=" + ret);
}
// Make sure that probability level is valid.
if (ret < DEFAULT_PROB || ret > MAX_PROB)
ret = conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB);
return ret;
}
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* A large directory listing may have to go through multiple RPCs.
* The directory to be listed may be removed before all contents are listed.
*
* This test uses AspectJ to simulate the scenario.
*/
public class TestFiListPath {
private static final Log LOG = LogFactory.getLog(TestFiListPath.class);
private static final int LIST_LIMIT = 1;
private static MiniDFSCluster cluster = null;
private static FileSystem fs;
private static Path TEST_PATH = new Path("/tmp");
@BeforeClass
public static void setup() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, LIST_LIMIT);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitClusterUp();
fs = cluster.getFileSystem();
}
@AfterClass
public static void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void prepare() throws IOException {
fs.mkdirs(TEST_PATH);
for (int i=0; i<LIST_LIMIT+1; i++) {
fs.mkdirs(new Path(TEST_PATH, "dir"+i));
}
}
@After
public void cleanup() throws IOException {
fs.delete(TEST_PATH, true);
}
/** Remove the target directory after the getListing RPC */
@Test
public void testTargetDeletionForListStatus() throws Exception {
LOG.info("Test Target Delete For listStatus");
try {
fs.listStatus(TEST_PATH);
fail("Test should fail with FileNotFoundException");
} catch (FileNotFoundException e) {
assertEquals("File " + TEST_PATH + " does not exist.", e.getMessage());
LOG.info(StringUtils.stringifyException(e));
}
}
/** Remove the target directory after the getListing RPC */
@Test
public void testTargetDeletionForListLocatedStatus() throws Exception {
LOG.info("Test Target Delete For listLocatedStatus");
RemoteIterator<LocatedFileStatus> itor = fs.listLocatedStatus(TEST_PATH);
itor.next();
assertFalse (itor.hasNext());
}
}

View File

@ -1,272 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.fs.FileContextTestHelper.*;
/**
* Rename names src to dst. Rename is done using following steps:
* <ul>
* <li>Checks are made to ensure src exists and appropriate flags are being
* passed to overwrite existing destination.
* <li>src is removed.
* <li>dst if it exists is removed.
* <li>src is renamed and added to directory tree as dst.
* </ul>
*
* During any of the above steps, the state of src and dst is reverted back to
* what it was prior to rename. This test ensures that the state is reverted
* back.
*
* This test uses AspectJ to simulate failures.
*/
public class TestFiRename {
private static final Log LOG = LogFactory.getLog(TestFiRename.class);
private static String removeChild = "";
private static String addChild = "";
private static byte[] data = { 0 };
private static String TEST_ROOT_DIR = PathUtils.getTestDirName(TestFiRename.class);
private static Configuration CONF = new Configuration();
static {
CONF.setInt("io.bytes.per.checksum", 1);
}
private MiniDFSCluster cluster = null;
private FileContext fc = null;
@Before
public void setup() throws IOException {
restartCluster(true);
}
@After
public void teardown() throws IOException {
if (fc != null) {
fc.delete(getTestRootPath(), true);
}
if (cluster != null) {
cluster.shutdown();
}
}
private void restartCluster(boolean format) throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
cluster = new MiniDFSCluster.Builder(CONF).format(format).build();
cluster.waitClusterUp();
fc = FileContext.getFileContext(cluster.getURI(0), CONF);
}
/**
* Returns true to indicate an exception should be thrown to simulate failure
* during removal of a node from directory tree.
*/
public static boolean throwExceptionOnRemove(String child) {
boolean status = removeChild.endsWith(child);
if (status) {
removeChild = "";
}
return status;
}
/**
* Returns true to indicate an exception should be thrown to simulate failure
* during addition of a node to directory tree.
*/
public static boolean throwExceptionOnAdd(String child) {
boolean status = addChild.endsWith(child);
if (status) {
addChild = "";
}
return status;
}
/** Set child name on removal of which failure should be simulated */
public static void exceptionOnRemove(String child) {
removeChild = child;
addChild = "";
}
/** Set child name on addition of which failure should be simulated */
public static void exceptionOnAdd(String child) {
removeChild = "";
addChild = child;
}
private Path getTestRootPath() {
return fc.makeQualified(new Path(TEST_ROOT_DIR));
}
private Path getTestPath(String pathString) {
return fc.makeQualified(new Path(TEST_ROOT_DIR, pathString));
}
private void createFile(Path path) throws IOException {
FSDataOutputStream out = fc.create(path, EnumSet.of(CreateFlag.CREATE),
Options.CreateOpts.createParent());
out.write(data, 0, data.length);
out.close();
}
/** Rename test when src exists and dst does not */
@Test
public void testFailureNonExistentDst() throws Exception {
final Path src = getTestPath("testFailureNonExistenSrc/dir/src");
final Path dst = getTestPath("testFailureNonExistenSrc/newdir/dst");
createFile(src);
// During rename, while removing src, an exception is thrown
TestFiRename.exceptionOnRemove(src.toString());
rename(src, dst, true, true, false, Rename.NONE);
// During rename, while adding dst an exception is thrown
TestFiRename.exceptionOnAdd(dst.toString());
rename(src, dst, true, true, false, Rename.NONE);
}
/** Rename test when src and dst exist */
@Test
public void testFailuresExistingDst() throws Exception {
final Path src = getTestPath("testFailuresExistingDst/dir/src");
final Path dst = getTestPath("testFailuresExistingDst/newdir/dst");
createFile(src);
createFile(dst);
// During rename, while removing src, an exception is thrown
TestFiRename.exceptionOnRemove(src.toString());
rename(src, dst, true, true, true, Rename.OVERWRITE);
// During rename, while removing dst, an exception is thrown
TestFiRename.exceptionOnRemove(dst.toString());
rename(src, dst, true, true, true, Rename.OVERWRITE);
// During rename, while adding dst an exception is thrown
TestFiRename.exceptionOnAdd(dst.toString());
rename(src, dst, true, true, true, Rename.OVERWRITE);
}
/** Rename test where both src and dst are files */
@Test
public void testDeletionOfDstFile() throws Exception {
Path src = getTestPath("testDeletionOfDstFile/dir/src");
Path dst = getTestPath("testDeletionOfDstFile/newdir/dst");
createFile(src);
createFile(dst);
final FSNamesystem namesystem = cluster.getNamesystem();
final long blocks = namesystem.getBlocksTotal();
final long fileCount = namesystem.getFilesTotal();
rename(src, dst, false, false, true, Rename.OVERWRITE);
// After successful rename the blocks corresponing dst are deleted
Assert.assertEquals(blocks - 1, namesystem.getBlocksTotal());
// After successful rename dst file is deleted
Assert.assertEquals(fileCount - 1, namesystem.getFilesTotal());
// Restart the cluster to ensure new rename operation
// recorded in editlog is processed right
restartCluster(false);
int count = 0;
boolean exception = true;
src = getTestPath("testDeletionOfDstFile/dir/src");
dst = getTestPath("testDeletionOfDstFile/newdir/dst");
while (exception && count < 5) {
try {
exists(fc, src);
exception = false;
} catch (Exception e) {
LOG.warn("Exception " + " count " + count + " " + e.getMessage());
Thread.sleep(1000);
count++;
}
}
Assert.assertFalse(exists(fc, src));
Assert.assertTrue(exists(fc, dst));
}
/** Rename test where both src and dst are directories */
@Test
public void testDeletionOfDstDirectory() throws Exception {
Path src = getTestPath("testDeletionOfDstDirectory/dir/src");
Path dst = getTestPath("testDeletionOfDstDirectory/newdir/dst");
fc.mkdir(src, FileContext.DEFAULT_PERM, true);
fc.mkdir(dst, FileContext.DEFAULT_PERM, true);
FSNamesystem namesystem = cluster.getNamesystem();
long fileCount = namesystem.getFilesTotal();
rename(src, dst, false, false, true, Rename.OVERWRITE);
// After successful rename dst directory is deleted
Assert.assertEquals(fileCount - 1, namesystem.getFilesTotal());
// Restart the cluster to ensure new rename operation
// recorded in editlog is processed right
restartCluster(false);
src = getTestPath("testDeletionOfDstDirectory/dir/src");
dst = getTestPath("testDeletionOfDstDirectory/newdir/dst");
int count = 0;
boolean exception = true;
while (exception && count < 5) {
try {
exists(fc, src);
exception = false;
} catch (Exception e) {
LOG.warn("Exception " + " count " + count + " " + e.getMessage());
Thread.sleep(1000);
count++;
}
}
Assert.assertFalse(exists(fc, src));
Assert.assertTrue(exists(fc, dst));
}
private void rename(Path src, Path dst, boolean exception, boolean srcExists,
boolean dstExists, Rename... options) throws IOException {
try {
fc.rename(src, dst, options);
Assert.assertFalse("Expected exception is not thrown", exception);
} catch (Exception e) {
LOG.warn("Exception ", e);
Assert.assertTrue(exception);
}
Assert.assertEquals(srcExists, exists(fc, src));
Assert.assertEquals(dstExists, exists(fc, dst));
}
}

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSOutputStream.DataStreamer;
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.junit.Assert;
/** Aspects for DFSClient */
privileged public aspect DFSClientAspects {
public static final Log LOG = LogFactory.getLog(DFSClientAspects.class);
pointcut callCreateBlockOutputStream(DataStreamer datastreamer):
call(* createBlockOutputStream(..)) && target(datastreamer);
before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
Assert.assertFalse(datastreamer.hasError);
Assert.assertEquals(-1, datastreamer.errorIndex);
}
pointcut pipelineInitNonAppend(DataStreamer datastreamer):
callCreateBlockOutputStream(datastreamer)
&& cflow(execution(* nextBlockOutputStream(..)))
&& within(DataStreamer);
after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) {
LOG.info("FI: after pipelineInitNonAppend: hasError="
+ datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
if (datastreamer.hasError) {
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
}
}
pointcut pipelineInitAppend(DataStreamer datastreamer):
callCreateBlockOutputStream(datastreamer)
&& cflow(execution(* initAppend(..)))
&& within(DataStreamer);
after(DataStreamer datastreamer) returning : pipelineInitAppend(datastreamer) {
LOG.info("FI: after pipelineInitAppend: hasError=" + datastreamer.hasError
+ " errorIndex=" + datastreamer.errorIndex);
}
pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
call(* processDatanodeError())
&& within (DFSOutputStream.DataStreamer)
&& target(datastreamer);
before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
+ datastreamer.errorIndex);
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null )
dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
}
pointcut pipelineClose(DFSOutputStream out):
call(void flushInternal())
&& withincode (void DFSOutputStream.close())
&& this(out);
before(DFSOutputStream out) : pipelineClose(out) {
LOG.info("FI: before pipelineClose:");
}
pointcut checkAckQueue(DFSOutputStream stream):
call (void DFSOutputStream.waitAndQueueCurrentPacket())
&& withincode (void DFSOutputStream.writeChunk(..))
&& this(stream);
after(DFSOutputStream stream) : checkAckQueue (stream) {
DFSOutputStream.Packet cp = stream.currentPacket;
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest != null && pTest instanceof PipelinesTest) {
LOG.debug("FI: Recording packet # " + cp.seqno
+ " where queuing has occurred");
((PipelinesTest) pTest).setVerified(cp.seqno);
}
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
public aspect HFlushAspects {
public static final Log LOG = LogFactory.getLog(HFlushAspects.class);
pointcut hflushCall (DFSOutputStream outstream) :
execution(void DFSOutputStream.hflush(..))
&& target (outstream);
/** This advise is suppose to initiate a call to the action (fiCallHFlush)
* which will throw DiskErrorException if a pipeline has been created
* and datanodes used are belong to that very pipeline
*/
after (DFSOutputStream streamer) throws IOException : hflushCall(streamer) {
LOG.info("FI: hflush for any datanode");
LOG.info("FI: hflush " + thisJoinPoint.getThis());
DatanodeInfo[] nodes = streamer.getPipeline();
if (nodes == null) {
LOG.info("No pipeline is built");
return;
}
PipelineTest pt = DataTransferTestUtil.getPipelineTest();
if (pt == null) {
LOG.info("No test has been initialized");
return;
}
if (pt instanceof HFlushTest)
for (int i=0; i<nodes.length; i++) {
try {
((HFlushTest)pt).fiCallHFlush.run(nodes[i]);
} catch (IOException ioe) {
((HFlushTest)pt).fiErrorOnCallHFlush.run(i);
}
}
}
}

View File

@ -1,153 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
public class PipelinesTestUtil extends DataTransferTestUtil {
/**
* {@inheritDoc}
*/
public static PipelineTest initTest() {
return thepipelinetest = new PipelinesTest();
}
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
/**
* @param name of the test
*/
public ReceivedCheckAction(String name) {
this.name = name;
}
@Override
public void run(NodeBytes nb) throws IOException {
synchronized (rcv) {
rcv.add(nb);
for (NodeBytes n : rcv) {
long counterPartsBytes = -1;
NodeBytes counterPart = null;
if (ack.size() > rcv.indexOf(n)) {
counterPart = ack.get(rcv.indexOf(n));
counterPartsBytes = counterPart.bytes;
}
assertTrue("FI: Wrong receiving length",
counterPartsBytes <= n.bytes);
if(FiTestUtil.LOG.isDebugEnabled()) {
FiTestUtil.LOG.debug("FI: before compare of Recv bytes. Expected "
+ n.bytes + ", got " + counterPartsBytes);
}
}
}
}
}
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
/**
* @param name of the test
*/
public AckedCheckAction(String name) {
this.name = name;
}
/**
* {@inheritDoc}
*/
public void run(NodeBytes nb) throws IOException {
synchronized (ack) {
ack.add(nb);
for (NodeBytes n : ack) {
NodeBytes counterPart = null;
long counterPartsBytes = -1;
if (rcv.size() > ack.indexOf(n)) {
counterPart = rcv.get(ack.indexOf(n));
counterPartsBytes = counterPart.bytes;
}
assertTrue("FI: Wrong acknowledged length",
counterPartsBytes == n.bytes);
if(FiTestUtil.LOG.isDebugEnabled()) {
FiTestUtil.LOG.debug(
"FI: before compare of Acked bytes. Expected " +
n.bytes + ", got " + counterPartsBytes);
}
}
}
}
}
/**
* Class adds new types of action
*/
public static class PipelinesTest extends DataTransferTest {
LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
public final ActionContainer<NodeBytes, IOException> fiCallSetNumBytes =
new ActionContainer<NodeBytes, IOException>();
public final ActionContainer<NodeBytes, IOException> fiCallSetBytesAcked =
new ActionContainer<NodeBytes, IOException>();
private static boolean suspend = false;
private static long lastQueuedPacket = -1;
public void setSuspend(boolean flag) {
suspend = flag;
}
public boolean getSuspend () {
return suspend;
}
public void setVerified(long packetNum) {
PipelinesTest.lastQueuedPacket = packetNum;
}
public long getLastQueued() {
return lastQueuedPacket;
}
}
public static class NodeBytes {
DatanodeID id;
long bytes;
public NodeBytes(DatanodeID id, long bytes) {
this.id = id;
this.bytes = bytes;
}
}
}

View File

@ -1,180 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.FiHFlushTestUtil;
import org.apache.hadoop.fi.FiHFlushTestUtil.DerrAction;
import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
import org.apache.hadoop.fi.FiTestUtil;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import java.io.IOException;
/** Class provides basic fault injection tests according to the test plan
of HDFS-265
*/
public class TestFiHFlush {
/** Methods initializes a test and sets required actions to be used later by
* an injected advice
* @param conf mini cluster configuration
* @param methodName String representation of a test method invoking this
* method
* @param block_size needed size of file's block
* @param a is an action to be set for the set
* @throws IOException in case of any errors
*/
private static void runDiskErrorTest (final Configuration conf,
final String methodName, final int block_size, DerrAction a, int index,
boolean trueVerification)
throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final HFlushTest hft = (HFlushTest) FiHFlushTestUtil.initTest();
hft.fiCallHFlush.set(a);
hft.fiErrorOnCallHFlush.set(new DataTransferTestUtil.VerificationAction(methodName, index));
TestHFlush.doTheJob(conf, methodName, block_size, (short)3);
if (trueVerification)
assertTrue("Some of expected conditions weren't detected", hft.isSuccess());
}
/** The tests calls
* {@link #runDiskErrorTest(Configuration, String, int, DerrAction, int, boolean)}
* to make a number of writes within a block boundaries.
* Although hflush() is called the test shouldn't expect an IOException
* in this case because the invocation is happening after write() call
* is complete when pipeline doesn't exist anymore.
* Thus, injected fault won't be triggered for 0th datanode
*/
@Test
public void hFlushFi01_a() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runDiskErrorTest(new HdfsConfiguration(), methodName,
AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 0), 0, false);
}
/** The tests calls
* {@link #runDiskErrorTest(Configuration, String, int, DerrAction, int, boolean)}
* to make a number of writes across a block boundaries.
* hflush() is called after each write() during a pipeline life time.
* Thus, injected fault ought to be triggered for 0th datanode
*/
@Test
public void hFlushFi01_b() throws IOException {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 0), 0, true);
}
/** Similar to {@link #hFlushFi01_b()} but writing happens
* across block and checksum's boundaries
*/
@Test
public void hFlushFi01_c() throws Exception {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 0), 0, true);
}
/** Similar to {@link #hFlushFi01_a()} but for a pipeline's 1st datanode
*/
@Test
public void hFlushFi02_a() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runDiskErrorTest(new HdfsConfiguration(), methodName,
AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 1), 1, false);
}
/** Similar to {@link #hFlushFi01_b()} but for a pipeline's 1st datanode
*/
@Test
public void hFlushFi02_b() throws IOException {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 1), 1, true);
}
/** Similar to {@link #hFlushFi01_c()} but for a pipeline's 1st datanode
*/
@Test
public void hFlushFi02_c() throws IOException {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 1), 1, true);
}
/** Similar to {@link #hFlushFi01_a()} but for a pipeline's 2nd datanode
*/
@Test
public void hFlushFi03_a() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runDiskErrorTest(new HdfsConfiguration(), methodName,
AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 2), 2, false);
}
/** Similar to {@link #hFlushFi01_b()} but for a pipeline's 2nd datanode
*/
@Test
public void hFlushFi03_b() throws IOException {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 2), 2, true);
}
/** Similar to {@link #hFlushFi01_c()} but for a pipeline's 2nd datanode
*/
@Test
public void hFlushFi03_c() throws IOException {
final String methodName = FiTestUtil.getMethodName();
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
runDiskErrorTest(conf, methodName,
customBlockSize, new DerrAction(methodName, 2), 2, true);
}
}

View File

@ -1,247 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiverAspects;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFiPipelines {
public static final Log LOG = LogFactory.getLog(TestFiPipelines.class);
private static short REPL_FACTOR = 3;
private static final int RAND_LIMIT = 2000;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private static Configuration conf;
Random rand = new Random(RAND_LIMIT);
static {
initLoggers();
setConfiguration();
}
@Before
public void startUpCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
fs = (DistributedFileSystem) cluster.getFileSystem();
}
@After
synchronized public void shutDownCluster() throws IOException {
if (cluster != null) cluster.shutdown();
}
/**
* Test initiates and sets actions created by injection framework. The actions
* work with both aspects of sending acknologment packets in a pipeline.
* Creates and closes a file of certain length < packet size.
* Injected actions will check if number of visible bytes at datanodes equals
* to number of acknoleged bytes
*
* @throws IOException in case of an error
*/
@Test
public void pipeline_04() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + METHOD_NAME);
}
final PipelinesTestUtil.PipelinesTest pipst =
(PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME));
pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME));
Path filePath = new Path("/" + METHOD_NAME + ".dat");
FSDataOutputStream fsOut = fs.create(filePath);
TestPipelines.writeData(fsOut, 2);
fs.close();
}
/**
* Similar to pipeline_04 but sends many packets into a pipeline
* @throws IOException in case of an error
*/
@Test
public void pipeline_05() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + METHOD_NAME);
}
final PipelinesTestUtil.PipelinesTest pipst =
(PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME));
pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME));
Path filePath = new Path("/" + METHOD_NAME + ".dat");
FSDataOutputStream fsOut = fs.create(filePath);
for (int i = 0; i < 17; i++) {
TestPipelines.writeData(fsOut, 23);
}
fs.close();
}
/**
* This quite tricky test prevents acknowledgement packets from a datanode
* This should block any write attempts after ackQueue is full.
* Test is blocking, so the MiniDFSCluster has to be killed harshly.
* @throws IOException in case of an error
*/
@Test
public void pipeline_06() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int MAX_PACKETS = 80;
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + METHOD_NAME);
}
final PipelinesTestUtil.PipelinesTest pipst =
(PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
pipst.setSuspend(true); // This is ack. suspend test
Path filePath = new Path("/" + METHOD_NAME + ".dat");
FSDataOutputStream fsOut = fs.create(filePath);
int cnt = 0;
try {
// At this point let's start an external checker thread, which will
// verify the test's results and shutdown the MiniDFSCluster for us,
// because what it's gonna do has BLOCKING effect on datanodes
QueueChecker cq = new QueueChecker(pipst, MAX_PACKETS);
cq.start();
// The following value is explained by the fact that size of a packet isn't
// necessary equals to the value of
// DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY
// The actual logic is expressed in DFSClient#computePacketChunkSize
int bytesToSend = 700;
while (cnt < 100 && pipst.getSuspend()) {
if(LOG.isDebugEnabled()) {
LOG.debug("_06(): " + cnt++ + " sending another " +
bytesToSend + " bytes");
}
TestPipelines.writeData(fsOut, bytesToSend);
}
} catch (Exception e) {
LOG.warn("Getting unexpected exception: ", e);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Last queued packet number " + pipst.getLastQueued());
}
assertTrue("Shouldn't be able to send more than 81 packet", pipst.getLastQueued() <= 81);
}
private class QueueChecker extends Thread {
PipelinesTestUtil.PipelinesTest test;
final int MAX;
boolean done = false;
public QueueChecker(PipelinesTestUtil.PipelinesTest handle, int maxPackets) {
test = handle;
MAX = maxPackets;
}
@Override
public void run() {
while (!done) {
if(LOG.isDebugEnabled()) {
LOG.debug("_06: checking for the limit " + test.getLastQueued() +
" and " + MAX);
}
if (test.getLastQueued() >= MAX) {
if(LOG.isDebugEnabled()) {
LOG.debug("FI: Resume packets acking");
}
test.setSuspend(false); //Do not suspend ack sending any more
done = true;
}
if (!done)
try {
if(LOG.isDebugEnabled()) {
LOG.debug("_06: MAX isn't reached yet. Current=" +
test.getLastQueued());
}
sleep(100);
} catch (InterruptedException e) { }
}
assertTrue("Shouldn't be able to send more than 81 packet", test.getLastQueued() <= 81);
try {
if(LOG.isDebugEnabled()) {
LOG.debug("_06: shutting down the cluster");
}
// It has to be done like that, because local version of shutDownCluster()
// won't work, because it tries to close an instance of FileSystem too.
// Which is where the waiting is happening.
if (cluster !=null )
shutDownCluster();
} catch (Exception e) {
e.printStackTrace();
}
if(LOG.isDebugEnabled()) {
LOG.debug("End QueueChecker thread");
}
}
}
private static void setConfiguration() {
conf = new Configuration();
int customPerChecksumSize = 700;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
}
private static void initLoggers() {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
GenericTestUtils.setLogLevel(LogFactory.getLog(FSNamesystem.class), Level.ALL);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
GenericTestUtils.setLogLevel(TestFiPipelines.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
GenericTestUtils.setLogLevel(FiTestUtil.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockReceiverAspects.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClientAspects.LOG, Level.ALL);
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.PipelineTest;
/** Aspect for ClientProtocol */
public aspect ClientProtocolAspects {
public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
pointcut addBlock():
call(LocatedBlock ClientProtocol.addBlock(String, String,..));
after() returning(LocatedBlock lb): addBlock() {
PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();
if (pipelineTest != null)
LOG.info("FI: addBlock "
+ pipelineTest.initPipeline(lb));
}
}

View File

@ -1,232 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.Pipeline;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.ProbabilityModel;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiver.PacketResponder;
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
* This aspect takes care about faults injected into datanode.BlockReceiver
* class
*/
privileged public aspect BlockReceiverAspects {
public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
BlockReceiver BlockReceiver.PacketResponder.getReceiver(){
LOG.info("FI: getReceiver() " + getClass().getName());
return BlockReceiver.this;
}
pointcut callReceivePacket(BlockReceiver blockreceiver) :
call(* receivePacket(..)) && target(blockreceiver);
before(BlockReceiver blockreceiver
) throws IOException : callReceivePacket(blockreceiver) {
final String dnName = blockreceiver.getDataNode().getMachineName();
final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId();
LOG.info("FI: callReceivePacket, datanode=" + dnName);
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiCallReceivePacket.run(dnId);
if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
LOG.info("Before the injection point");
Thread.dumpStack();
throw new DiskOutOfSpaceException ("FI: injected fault point at " +
thisJoinPoint.getStaticPart( ).getSourceLocation());
}
}
pointcut callWritePacketToDisk(BlockReceiver blockreceiver) :
call(* writePacketToDisk(..)) && target(blockreceiver);
before(BlockReceiver blockreceiver
) throws IOException : callWritePacketToDisk(blockreceiver) {
LOG.info("FI: callWritePacketToDisk");
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiCallWritePacketToDisk.run(
blockreceiver.getDataNode().getDatanodeId());
}
pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
call(void PipelineAck.readFields(InputStream)) && this(responder);
after(BlockReceiver.PacketResponder responder)
throws IOException: afterDownstreamStatusRead(responder) {
final DataNode d = responder.getReceiver().getDataNode();
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeId());
}
// Pointcuts and advises for TestFiPipelines
pointcut callSetNumBytes(BlockReceiver br, long offset) :
call (void ReplicaInPipelineInterface.setNumBytes(long))
&& withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
&& args(offset)
&& this(br);
after(BlockReceiver br, long offset) : callSetNumBytes(br, offset) {
LOG.debug("FI: Received bytes To: " + br.datanode.getStorageId() + ": " + offset);
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
LOG.debug("FI: no pipeline has been found in receiving");
return;
}
if (!(pTest instanceof PipelinesTest)) {
return;
}
NodeBytes nb = new NodeBytes(br.datanode.getDatanodeId(), offset);
try {
((PipelinesTest)pTest).fiCallSetNumBytes.run(nb);
} catch (IOException e) {
LOG.fatal("FI: no exception is expected here!");
}
}
// Pointcuts and advises for TestFiPipelines
pointcut callSetBytesAcked(PacketResponder pr, long acked) :
call (void ReplicaInPipelineInterface.setBytesAcked(long))
&& withincode (void PacketResponder.run())
&& args(acked)
&& this(pr);
after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
LOG.debug("FI: no pipeline has been found in acking");
return;
}
LOG.debug("FI: Acked total bytes from: " +
pr.getReceiver().datanode.getStorageId() + ": " + acked);
if (pTest instanceof PipelinesTest) {
bytesAckedService((PipelinesTest)pTest, pr, acked);
}
}
private void bytesAckedService
(final PipelinesTest pTest, final PacketResponder pr, final long acked) {
NodeBytes nb = new NodeBytes(pr.getReceiver().datanode.getDatanodeId(), acked);
try {
pTest.fiCallSetBytesAcked.run(nb);
} catch (IOException e) {
LOG.fatal("No exception should be happening at this point");
assert false;
}
}
pointcut preventAckSending () :
call (void PipelineAck.write(OutputStream))
&& within (PacketResponder);
static int ackCounter = 0;
void around () : preventAckSending () {
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
LOG.debug("FI: remove first ack as expected");
proceed();
return;
}
if (!(pTest instanceof PipelinesTest)) {
LOG.debug("FI: remove first ack as expected");
proceed();
return;
}
if (((PipelinesTest)pTest).getSuspend()) {
LOG.debug("FI: suspend the ack");
return;
}
LOG.debug("FI: remove first ack as expected");
proceed();
}
// End of pointcuts and advises for TestFiPipelines
pointcut pipelineClose(BlockReceiver blockreceiver, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, int endOfHeader) :
call (* BlockReceiver.receivePacket(long, long, boolean, int, int))
&& this(blockreceiver)
&& args(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
before(BlockReceiver blockreceiver, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, int endOfHeader
) throws IOException : pipelineClose(blockreceiver, offsetInBlock, seqno,
lastPacketInBlock, len, endOfHeader) {
if (len == 0) {
final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId();
LOG.info("FI: pipelineClose, datanode=" + dnId.getName()
+ ", offsetInBlock=" + offsetInBlock
+ ", seqno=" + seqno
+ ", lastPacketInBlock=" + lastPacketInBlock
+ ", len=" + len
+ ", endOfHeader=" + endOfHeader);
final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
if (test != null) {
test.fiPipelineClose.run(dnId);
}
}
}
pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
call (void PipelineAck.readFields(InputStream))
&& this(packetresponder);
after(BlockReceiver.PacketResponder packetresponder) throws IOException
: pipelineAck(packetresponder) {
final DatanodeID dnId = packetresponder.getReceiver().getDataNode().getDatanodeId();
LOG.info("FI: fiPipelineAck, datanode=" + dnId);
final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
if (test != null) {
test.fiPipelineAck.run(dnId);
}
}
pointcut blockFileClose(BlockReceiver blockreceiver) :
call(void close())
&& withincode(void BlockReceiver.close())
&& this(blockreceiver);
after(BlockReceiver blockreceiver) throws IOException : blockFileClose(blockreceiver) {
final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId();
LOG.info("FI: blockFileClose, datanode=" + dnId);
final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
if (test != null) {
test.fiBlockFileClose.run(dnId);
}
}
}

View File

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
/** Aspect for DataTransferProtocol */
public aspect DataTransferProtocolAspects {
public static final Log LOG = LogFactory.getLog(
DataTransferProtocolAspects.class);
/*
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
}
*/
pointcut receiverOp(DataXceiver dataxceiver):
call(Op Receiver.readOp()) && target(dataxceiver);
after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
LOG.info("FI: receiverOp " + op + ", datanode="
+ dataxceiver.getDataNode().getDatanodeId().getName());
}
pointcut statusRead(DataXceiver dataxceiver):
call(BlockOpResponseProto BlockOpResponseProto.parseFrom(InputStream)) && this(dataxceiver);
after(DataXceiver dataxceiver) returning(BlockOpResponseProto status
) throws IOException: statusRead(dataxceiver) {
final DataNode d = dataxceiver.getDataNode();
LOG.info("FI: statusRead " + status + ", datanode="
+ d.getDatanodeId().getName());
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiStatusRead.run(d.getDatanodeId());
}
pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
call(void Receiver.opWriteBlock(DataInputStream)) && target(dataxceiver);
before(DataXceiver dataxceiver
) throws IOException: receiverOpWriteBlock(dataxceiver) {
LOG.info("FI: receiverOpWriteBlock");
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiReceiverOpWriteBlock.run(
dataxceiver.getDataNode().getDatanodeId());
}
}

View File

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.ProbabilityModel;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.util.DiskChecker.*;
/**
* This aspect takes care about faults injected into datanode.FSDatase class
*/
public aspect FSDatasetAspects {
public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
pointcut execGetBlockFile() :
// the following will inject faults inside of the method in question
execution (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
pointcut callCreateBlockWriteStream(ReplicaInPipeline repl) :
call (BlockWriteStreams createStreams(..))
&& target (repl)
&& !within(FSDatasetAspects +);
// This aspect specifies the logic of our fault point.
// In this case it simply throws DiskErrorException at the very beginning of
// invocation of the method, specified by callGetBlockFile() pointcut
before() throws DiskErrorException : execGetBlockFile() {
if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) {
LOG.info("Before the injection point");
Thread.dumpStack();
throw new DiskErrorException("FI: injected fault point at "
+ thisJoinPoint.getStaticPart().getSourceLocation());
}
}
before(ReplicaInPipeline repl) throws DiskOutOfSpaceException : callCreateBlockWriteStream(repl) {
if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) {
LOG.info("Before the injection point");
Thread.dumpStack();
throw new DiskOutOfSpaceException("FI: injected fault point at "
+ thisJoinPoint.getStaticPart().getSourceLocation());
}
}
}

View File

@ -1,312 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.FiTestUtil.Action;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/** Test DataTransferProtocol with fault injection. */
public class TestFiDataTransferProtocol {
static final short REPLICATION = 3;
static final long BLOCKSIZE = 1L * (1L << 20);
static final Configuration conf = new HdfsConfiguration();
static {
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
}
static private FSDataOutputStream createFile(FileSystem fs, Path p
) throws IOException {
return fs.create(p, true,
fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
4096), REPLICATION, BLOCKSIZE);
}
{
GenericTestUtils.setLogLevel(DataTransferProtocol.LOG, Level.ALL);
}
/**
* 1. create files with dfs
* 2. write 1 byte
* 3. close file
* 4. open the same file
* 5. read the 1 byte and compare results
*/
static void write1byte(String methodName) throws IOException {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(REPLICATION + 1).build();
final FileSystem dfs = cluster.getFileSystem();
try {
final Path p = new Path("/" + methodName + "/foo");
final FSDataOutputStream out = createFile(dfs, p);
out.write(1);
out.close();
final FSDataInputStream in = dfs.open(p);
final int b = in.read();
in.close();
Assert.assertEquals(1, b);
}
finally {
dfs.close();
cluster.shutdown();
}
}
private static void runSlowDatanodeTest(String methodName, SleepAction a
) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
t.fiCallReceivePacket.set(a);
t.fiReceiverOpWriteBlock.set(a);
t.fiStatusRead.set(a);
write1byte(methodName);
}
private static void runReceiverOpWriteBlockTest(String methodName,
int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiReceiverOpWriteBlock.set(a);
t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName,
errorIndex));
write1byte(methodName);
Assert.assertTrue(t.isSuccess());
}
private static void runStatusReadTest(String methodName, int errorIndex,
Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiStatusRead.set(a);
t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName,
errorIndex));
write1byte(methodName);
Assert.assertTrue(t.isSuccess());
}
private static void runCallWritePacketToDisk(String methodName,
int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
t.fiCallWritePacketToDisk.set(a);
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
write1byte(methodName);
Assert.assertTrue(t.isSuccess());
}
/**
* Pipeline setup:
* DN0 never responses after received setup request from client.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_01() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 0, new SleepAction(methodName, 0, 0));
}
/**
* Pipeline setup:
* DN1 never responses after received setup request from client.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_02() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 1, new SleepAction(methodName, 1, 0));
}
/**
* Pipeline setup:
* DN2 never responses after received setup request from client.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_03() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 2, new SleepAction(methodName, 2, 0));
}
/**
* Pipeline setup, DN1 never responses after received setup ack from DN2.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_04() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runStatusReadTest(methodName, 1, new SleepAction(methodName, 1, 0));
}
/**
* Pipeline setup, DN0 never responses after received setup ack from DN1.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_05() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runStatusReadTest(methodName, 0, new SleepAction(methodName, 0, 0));
}
/**
* Pipeline setup with DN0 very slow but it won't lead to timeout.
* Client finishes setup successfully.
*/
@Test
public void pipeline_Fi_06() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000));
}
/**
* Pipeline setup with DN1 very slow but it won't lead to timeout.
* Client finishes setup successfully.
*/
@Test
public void pipeline_Fi_07() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000));
}
/**
* Pipeline setup with DN2 very slow but it won't lead to timeout.
* Client finishes setup successfully.
*/
@Test
public void pipeline_Fi_08() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runSlowDatanodeTest(methodName, new SleepAction(methodName, 2, 3000));
}
/**
* Pipeline setup, DN0 throws an OutOfMemoryException right after it
* received a setup request from client.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_09() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 0, new OomAction(methodName, 0));
}
/**
* Pipeline setup, DN1 throws an OutOfMemoryException right after it
* received a setup request from DN0.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_10() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 1, new OomAction(methodName, 1));
}
/**
* Pipeline setup, DN2 throws an OutOfMemoryException right after it
* received a setup request from DN1.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_11() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runReceiverOpWriteBlockTest(methodName, 2, new OomAction(methodName, 2));
}
/**
* Pipeline setup, DN1 throws an OutOfMemoryException right after it
* received a setup ack from DN2.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_12() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runStatusReadTest(methodName, 1, new OomAction(methodName, 1));
}
/**
* Pipeline setup, DN0 throws an OutOfMemoryException right after it
* received a setup ack from DN1.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_13() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runStatusReadTest(methodName, 0, new OomAction(methodName, 0));
}
/**
* Streaming: Write a packet, DN0 throws a DiskOutOfSpaceError
* when it writes the data to disk.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_14() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0));
}
/**
* Streaming: Write a packet, DN1 throws a DiskOutOfSpaceError
* when it writes the data to disk.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_15() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1));
}
/**
* Streaming: Write a packet, DN2 throws a DiskOutOfSpaceError
* when it writes the data to disk.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_16() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
}
}

View File

@ -1,289 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
import org.apache.hadoop.fi.FiTestUtil;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/** Test DataTransferProtocol with fault injection. */
public class TestFiDataTransferProtocol2 {
static final short REPLICATION = 3;
static final long BLOCKSIZE = 1L * (1L << 20);
static final int PACKET_SIZE = 1024;
static final int MIN_N_PACKET = 3;
static final int MAX_N_PACKET = 10;
static final int MAX_SLEEP = 1000;
static final Configuration conf = new Configuration();
static {
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
}
static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];
static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE];
static private FSDataOutputStream createFile(FileSystem fs, Path p
) throws IOException {
return fs.create(p, true, fs.getConf()
.getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), REPLICATION, BLOCKSIZE);
}
{
GenericTestUtils.setLogLevel(BlockReceiver.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DataTransferProtocol.LOG, Level.ALL);
}
/**
* 1. create files with dfs
* 2. write MIN_N_PACKET to MAX_N_PACKET packets
* 3. close file
* 4. open the same file
* 5. read the bytes and compare results
*/
private static void writeSeveralPackets(String methodName) throws IOException {
final Random r = FiTestUtil.RANDOM.get();
final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1);
final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1);
final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize;
FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+ ", lastPacketSize=" + lastPacketSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(REPLICATION + 2).build();
final FileSystem dfs = cluster.getFileSystem();
try {
final Path p = new Path("/" + methodName + "/foo");
final FSDataOutputStream out = createFile(dfs, p);
final long seed = r.nextLong();
final Random ran = new Random(seed);
ran.nextBytes(bytes);
out.write(bytes, 0, size);
out.close();
final FSDataInputStream in = dfs.open(p);
int totalRead = 0;
int nRead = 0;
while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) {
totalRead += nRead;
}
Assert.assertEquals("Cannot read file.", size, totalRead);
for (int i = 0; i < size; i++) {
Assert.assertTrue("File content differ.", bytes[i] == toRead[i]);
}
}
finally {
dfs.close();
cluster.shutdown();
}
}
private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a)
throws IOException {
t.fiCallReceivePacket.set(a);
t.fiReceiverOpWriteBlock.set(a);
t.fiStatusRead.set(a);
}
private void runTest17_19(String methodName, int dnIndex)
throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP));
initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP));
initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP));
t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
writeSeveralPackets(methodName);
Assert.assertTrue(t.isSuccess());
}
private void runTest29_30(String methodName, int dnIndex) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP));
initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP));
initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP));
t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
writeSeveralPackets(methodName);
Assert.assertTrue(t.isSuccess());
}
private void runTest34_35(String methodName, int dnIndex) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3));
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
writeSeveralPackets(methodName);
Assert.assertTrue(t.isSuccess());
}
/**
* Streaming:
* Randomize datanode speed, write several packets,
* DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk.
* Client gets an IOException and determines DN0 bad.
*/
@Test
public void pipeline_Fi_17() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest17_19(methodName, 0);
}
/**
* Streaming:
* Randomize datanode speed, write several packets,
* DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk.
* Client gets an IOException and determines DN1 bad.
*/
@Test
public void pipeline_Fi_18() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest17_19(methodName, 1);
}
/**
* Streaming:
* Randomize datanode speed, write several packets,
* DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk.
* Client gets an IOException and determines DN2 bad.
*/
@Test
public void pipeline_Fi_19() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest17_19(methodName, 2);
}
/**
* Streaming: Client writes several packets with DN0 very slow. Client
* finishes write successfully.
*/
@Test
public void pipeline_Fi_20() throws IOException {
final String methodName = FiTestUtil.getMethodName();
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
initSlowDatanodeTest(t, new SleepAction(methodName, 0, MAX_SLEEP));
writeSeveralPackets(methodName);
}
/**
* Streaming: Client writes several packets with DN1 very slow. Client
* finishes write successfully.
*/
@Test
public void pipeline_Fi_21() throws IOException {
final String methodName = FiTestUtil.getMethodName();
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
initSlowDatanodeTest(t, new SleepAction(methodName, 1, MAX_SLEEP));
writeSeveralPackets(methodName);
}
/**
* Streaming: Client writes several packets with DN2 very slow. Client
* finishes write successfully.
*/
@Test
public void pipeline_Fi_22() throws IOException {
final String methodName = FiTestUtil.getMethodName();
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
initSlowDatanodeTest(t, new SleepAction(methodName, 2, MAX_SLEEP));
writeSeveralPackets(methodName);
}
/**
* Streaming: Randomize datanode speed, write several packets, DN1 throws a
* OutOfMemoryException when it receives the ack of the third packet from DN2.
* Client gets an IOException and determines DN1 bad.
*/
@Test
public void pipeline_Fi_29() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest29_30(methodName, 1);
}
/**
* Streaming: Randomize datanode speed, write several packets, DN0 throws a
* OutOfMemoryException when it receives the ack of the third packet from DN1.
* Client gets an IOException and determines DN0 bad.
*/
@Test
public void pipeline_Fi_30() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest29_30(methodName, 0);
}
/**
* Streaming: Write several packets, DN1 never responses when it receives the
* ack of the third packet from DN2. Client gets an IOException and determines
* DN1 bad.
*/
@Test
public void pipeline_Fi_34() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest34_35(methodName, 1);
}
/**
* Streaming: Write several packets, DN0 never responses when it receives the
* ack of the third packet from DN1. Client gets an IOException and determines
* DN0 bad.
*/
@Test
public void pipeline_Fi_35() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runTest34_35(methodName, 0);
}
}

View File

@ -1,250 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.FiTestUtil.Action;
import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.junit.Test;
/** Test DataTransferProtocol with fault injection. */
public class TestFiPipelineClose {
private static void runPipelineCloseTest(String methodName,
Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiPipelineClose.set(a);
TestFiDataTransferProtocol.write1byte(methodName);
}
/**
* Pipeline close:
* DN0 never responses after received close request from client.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_36() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0));
}
/**
* Pipeline close:
* DN1 never responses after received close request from client.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_37() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0));
}
/**
* Pipeline close:
* DN2 never responses after received close request from client.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_38() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
}
private static void run41_43(String name, int i) throws IOException {
runPipelineCloseTest(name, new SleepAction(name, i, 3000));
}
private static void runPipelineCloseAck(String name, int i, DataNodeAction a
) throws IOException {
FiTestUtil.LOG.info("Running " + name + " ...");
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
final MarkerConstraint marker = new MarkerConstraint(name);
t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
TestFiDataTransferProtocol.write1byte(name);
}
private static void run39_40(String name, int i) throws IOException {
runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
}
/**
* Pipeline close:
* DN1 never responses after received close ack DN2.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_39() throws IOException {
run39_40(FiTestUtil.getMethodName(), 1);
}
/**
* Pipeline close:
* DN0 never responses after received close ack DN1.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_40() throws IOException {
run39_40(FiTestUtil.getMethodName(), 0);
}
/**
* Pipeline close with DN0 very slow but it won't lead to timeout.
* Client finishes close successfully.
*/
@Test
public void pipeline_Fi_41() throws IOException {
run41_43(FiTestUtil.getMethodName(), 0);
}
/**
* Pipeline close with DN1 very slow but it won't lead to timeout.
* Client finishes close successfully.
*/
@Test
public void pipeline_Fi_42() throws IOException {
run41_43(FiTestUtil.getMethodName(), 1);
}
/**
* Pipeline close with DN2 very slow but it won't lead to timeout.
* Client finishes close successfully.
*/
@Test
public void pipeline_Fi_43() throws IOException {
run41_43(FiTestUtil.getMethodName(), 2);
}
/**
* Pipeline close:
* DN0 throws an OutOfMemoryException
* right after it received a close request from client.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_44() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new OomAction(methodName, 0));
}
/**
* Pipeline close:
* DN1 throws an OutOfMemoryException
* right after it received a close request from client.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_45() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new OomAction(methodName, 1));
}
/**
* Pipeline close:
* DN2 throws an OutOfMemoryException
* right after it received a close request from client.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_46() throws IOException {
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new OomAction(methodName, 2));
}
private static void run47_48(String name, int i) throws IOException {
runPipelineCloseAck(name, i, new OomAction(name, i));
}
/**
* Pipeline close:
* DN1 throws an OutOfMemoryException right after
* it received a close ack from DN2.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_47() throws IOException {
run47_48(FiTestUtil.getMethodName(), 1);
}
/**
* Pipeline close:
* DN0 throws an OutOfMemoryException right after
* it received a close ack from DN1.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_48() throws IOException {
run47_48(FiTestUtil.getMethodName(), 0);
}
private static void runBlockFileCloseTest(String methodName,
Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiBlockFileClose.set(a);
TestFiDataTransferProtocol.write1byte(methodName);
}
private static void run49_51(String name, int i) throws IOException {
runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
}
/**
* Pipeline close:
* DN0 throws a disk error exception when it is closing the block file.
* Client gets an IOException and determine DN0 bad.
*/
@Test
public void pipeline_Fi_49() throws IOException {
run49_51(FiTestUtil.getMethodName(), 0);
}
/**
* Pipeline close:
* DN1 throws a disk error exception when it is closing the block file.
* Client gets an IOException and determine DN1 bad.
*/
@Test
public void pipeline_Fi_50() throws IOException {
run49_51(FiTestUtil.getMethodName(), 1);
}
/**
* Pipeline close:
* DN2 throws a disk error exception when it is closing the block file.
* Client gets an IOException and determine DN2 bad.
*/
@Test
public void pipeline_Fi_51() throws IOException {
run49_51(FiTestUtil.getMethodName(), 2);
}
}

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.URL;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.security.UserGroupInformation;
public aspect FileDataServletAspects {
static final Log LOG = FileDataServlet.LOG;
pointcut callCreateUrl() : call (URL FileDataServlet.createRedirectURL(
String, String, HdfsFileStatus, UserGroupInformation, ClientProtocol,
HttpServletRequest, String));
/** Replace host name with "localhost" for unit test environment. */
URL around () throws IOException : callCreateUrl() {
final URL original = proceed();
LOG.info("FI: original url = " + original);
final URL replaced = new URL("http", "localhost", original.getPort(),
original.getPath() + '?' + original.getQuery());
LOG.info("FI: replaced url = " + replaced);
return replaced;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
/**
* The aspects here are used for testing HDFS implementation of iterative
* directory listing functionality. A directory is deleted right after
* the first listPath RPC.
*/
public privileged aspect ListPathAspects {
public static final Log LOG = LogFactory.getLog(ListPathAspects.class);
pointcut callGetListing(FSNamesystem fd, String src,
byte[] startAfter, boolean needLocation) :
call(DirectoryListing FSNamesystem.getListing(String, byte[], boolean))
&& target(fd)
&& args(src, startAfter, needLocation);
after(FSNamesystem fd, String src, byte[] startAfter, boolean needLocation)
throws IOException, UnresolvedLinkException:
callGetListing(fd, src, startAfter, needLocation) {
LOG.info("FI: callGetListing");
fd.delete(src, true);
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.*;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.TestFiRename;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
/**
* The aspects here are used for testing HDFS implementation of rename
* functionality. Failure is introduced during rename to test the atomicity of
* rename.
*/
public privileged aspect RenameAspects {
public static final Log LOG = LogFactory.getLog(RenameAspects.class);
/** When removeChild is called during rename, throw exception */
pointcut callRemove(INode[] inodes, int pos) :
call(* FSDirectory.removeChild(INode[], int))
&& args(inodes, pos)
&& withincode (* FSDirectory.unprotectedRenameTo(String,
String, long, Rename...));
before(INode[] inodes, int pos) throws RuntimeException :
callRemove(inodes, pos) {
LOG.info("FI: callRenameRemove");
if (TestFiRename.throwExceptionOnRemove(inodes[pos].getLocalName())) {
throw new RuntimeException("RenameAspects - on remove " +
inodes[pos].getLocalName());
}
}
/** When addChildNoQuotaCheck is called during rename, throw exception */
pointcut callAddChildNoQuotaCheck(INode[] inodes, int pos, INode node, long diskspace, boolean flag) :
call(* FSDirectory.addChildNoQuotaCheck(INode[], int, INode, long, boolean))
&& args(inodes, pos, node, diskspace, flag)
&& withincode (* FSDirectory.unprotectedRenameTo(String,
String, long, Rename...));
before(INode[] inodes, int pos, INode node, long diskspace, boolean flag)
throws RuntimeException :
callAddChildNoQuotaCheck(inodes, pos, node, diskspace, flag) {
LOG.info("FI: callAddChildNoQuotaCheck");
if (TestFiRename.throwExceptionOnAdd(inodes[pos].getLocalName())) {
throw new RuntimeException("RenameAspects on add " +
inodes[pos].getLocalName());
}
}
}