HDFS-5924. Utilize OOB upgrade message processing for writes. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-02-25 19:24:15 +00:00
parent 6780b086d8
commit 57b28693ee
9 changed files with 347 additions and 27 deletions

View File

@ -93,3 +93,5 @@ HDFS-5535 subtasks:
HDFS-6015. Fix TestBlockRecovery
#testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal)
HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal)

View File

@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@ -268,6 +270,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final int getFileBlockStorageLocationsTimeout;
final int retryTimesForGetLastBlockLength;
final int retryIntervalForGetLastBlockLength;
final long datanodeRestartTimeout;
final boolean useLegacyBlockReader;
final boolean useLegacyBlockReaderLocal;
@ -411,6 +414,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
shortCircuitCacheStaleThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
}
private DataChecksum.Type getChecksumType(Configuration conf) {

View File

@ -94,6 +94,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout";
public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
public static final long DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
@ -335,6 +336,8 @@ public class DFSOutputStream extends FSOutputSummer
private String[] favoredNodes;
volatile boolean hasError = false;
volatile int errorIndex = -1;
volatile int restartingNodeIndex = -1; // Restarting node index
private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
@ -471,7 +474,7 @@ public class DFSOutputStream extends FSOutputSummer
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && errorIndex>=0) {
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
}
@ -571,8 +574,12 @@ public class DFSOutputStream extends FSOutputSummer
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN
errorIndex = 0;
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
}
lastPacket = Time.now();
@ -609,12 +616,16 @@ public class DFSOutputStream extends FSOutputSummer
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
DFSClient.LOG.warn("DataStreamer Exception", e);
// Log warning if there was a real error.
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e);
}
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
if (errorIndex == -1) { // not a datanode error
if (errorIndex == -1 && restartingNodeIndex == -1) {
// Not a datanode issue
streamerClosed = true;
}
}
@ -694,6 +705,65 @@ public class DFSOutputStream extends FSOutputSummer
}
}
// The following synchronized methods are used whenever
// errorIndex or restartingNodeIndex is set. This is because
// check & set needs to be atomic. Simply reading variables
// does not require a synchronization. When responder is
// not running (e.g. during pipeline recovery), there is no
// need to use these methods.
/** Set the error node index. Called by responder */
synchronized void setErrorIndex(int idx) {
errorIndex = idx;
}
/** Set the restarting node index. Called by responder */
synchronized void setRestartingNodeIndex(int idx) {
restartingNodeIndex = idx;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
errorIndex = -1;
}
/**
* This method is used when no explicit error report was received,
* but something failed. When the primary node is a suspect or
* unsure about the cause, the primary node is marked as failed.
*/
synchronized void tryMarkPrimaryDatanodeFailed() {
// There should be no existing error and no ongoing restart.
if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
errorIndex = 0;
}
}
/**
* Examine whether it is worth waiting for a node to restart.
* @param index the node index
*/
boolean shouldWaitForRestart(int index) {
// Only one node in the pipeline.
if (nodes.length == 1) {
return true;
}
// Is it a local node?
InetAddress addr = null;
try {
addr = InetAddress.getByName(nodes[index].getIpAddr());
} catch (java.net.UnknownHostException e) {
// we are passing an ip address. this should not happen.
assert false;
}
if (addr != null && NetUtils.isLocalAddress(addr)) {
return true;
}
return false;
}
//
// Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
@ -727,8 +797,20 @@ public class DFSOutputStream extends FSOutputSummer
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = ack.getReply(i);
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
throw new IOException(message);
}
// node error
if (reply != SUCCESS) {
errorIndex = i; // first bad datanode
setErrorIndex(i); // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
@ -777,12 +859,16 @@ public class DFSOutputStream extends FSOutputSummer
setLastException((IOException)e);
}
hasError = true;
errorIndex = errorIndex==-1 ? 0 : errorIndex;
// If no explicit error report was received, mark the primary
// node as failed.
tryMarkPrimaryDatanodeFailed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
+ " for block " + block, e);
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
+ " for block " + block, e);
}
responderClosed = true;
}
}
@ -1001,6 +1087,24 @@ public class DFSOutputStream extends FSOutputSummer
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
// Sleep before reconnect if a dn is restarting.
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex >= 0) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException("Interrupted while waiting for " +
"datanode to restart. " + nodes[restartingNodeIndex]));
streamerClosed = true;
return false;
}
}
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
@ -1037,7 +1141,24 @@ public class DFSOutputStream extends FSOutputSummer
setPipeline(newnodes, newStorageIDs);
hasError = false;
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (errorIndex > restartingNodeIndex) {
restartingNodeIndex = -1;
} else if (errorIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
// this shouldn't happen...
assert false;
}
}
if (restartingNodeIndex == -1) {
hasError = false;
}
lastException.set(null);
errorIndex = -1;
}
@ -1066,7 +1187,34 @@ public class DFSOutputStream extends FSOutputSummer
} else {
success = createBlockOutputStream(nodes, newGS, isRecovery);
}
}
if (restartingNodeIndex >= 0) {
assert hasError == true;
// check errorIndex set above
if (errorIndex == restartingNodeIndex) {
// ignore, if came from the restarting node
errorIndex = -1;
}
// still within the deadline
if (Time.now() < restartDeadline) {
continue; // with in the deadline
}
// expired. declare the restarting node dead
restartDeadline = 0;
int expiredNodeIndex = restartingNodeIndex;
restartingNodeIndex = -1;
DFSClient.LOG.warn("Datanode did not restart in time: " +
nodes[expiredNodeIndex]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (errorIndex == -1) {
errorIndex = expiredNodeIndex;
}
// From this point on, normal pipeline recovery applies.
}
} // while
if (success) {
// update pipeline at the namenode
@ -1144,6 +1292,7 @@ public class DFSOutputStream extends FSOutputSummer
}
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
boolean checkRestart = false;
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
DFSClient.LOG.debug("pipeline = " + nodes[i]);
@ -1192,6 +1341,16 @@ public class DFSOutputStream extends FSOutputSummer
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
// Got an restart OOB ack.
// If a node is already restarting, this status is not likely from
// the same node. If it is from a different node, it is not
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
restartingNodeIndex == -1) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
@ -1205,9 +1364,12 @@ public class DFSOutputStream extends FSOutputSummer
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
restartingNodeIndex = -1;
hasError = false;
} catch (IOException ie) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
if (restartingNodeIndex == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
@ -1230,8 +1392,18 @@ public class DFSOutputStream extends FSOutputSummer
}
}
} else {
assert checkRestart == false;
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
restartingNodeIndex = errorIndex;
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex]);
}
hasError = true;
setLastException(ie);
result = false; // error

View File

@ -23,8 +23,10 @@ import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -52,6 +55,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@ -116,6 +120,7 @@ class BlockReceiver implements Closeable {
private final boolean isTransfer;
private boolean syncOnClose;
private long restartBudget;
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
@ -135,6 +140,7 @@ class BlockReceiver implements Closeable {
this.clientname = clientname;
this.isDatanode = clientname.length() == 0;
this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
//for datanode, we have
//1: clientName.length() == 0, and
@ -742,16 +748,35 @@ class BlockReceiver implements Closeable {
if (responder != null) {
// In case this datanode is shutting down for quick restart,
// send a special ack upstream.
if (datanode.isRestarting()) {
if (datanode.isRestarting() && isClient && !isTransfer) {
File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile();
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
if (restartMeta.exists()) {
restartMeta.delete();
}
try {
FileWriter out = new FileWriter(restartMeta);
// write out the current time.
out.write(Long.toString(Time.now() + restartBudget));
out.flush();
out.close();
} catch (IOException ioe) {
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
}
try {
((PacketResponder) responder.getRunnable()).
sendOOBResponse(PipelineAck.getRestartOOBStatus());
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
// missing the OOB ack.
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
} catch (IOException ioe) {
LOG.info("Error sending OOB Ack.", ioe);
// The OOB ack could not be sent. Since the datanode is going
// down, this is ignored.
}
}
responder.interrupt();
@ -1279,7 +1304,6 @@ class BlockReceiver implements Closeable {
&& offsetInBlock > replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(offsetInBlock);
}
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();

View File

@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NA
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -82,6 +84,7 @@ public class DNConf {
final String encryptionAlgorithm;
final long xceiverStopTimeout;
final long restartReplicaExpiry;
final long maxLockedMemory;
@ -157,6 +160,10 @@ public class DNConf {
this.maxLockedMemory = conf.getLong(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
this.restartReplicaExpiry = conf.getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.

View File

@ -21,9 +21,11 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
@ -36,11 +38,13 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
@ -191,9 +195,35 @@ class BlockPoolSlice {
newReplica = new FinalizedReplica(blockId,
blockFile.length(), genStamp, volume, blockFile.getParentFile());
} else {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile());
boolean loadRwr = true;
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
Scanner sc = null;
try {
sc = new Scanner(restartMeta);
// The restart meta file exists
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
newReplica = new ReplicaBeingWritten(blockId,
validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile(), null);
loadRwr = false;
}
restartMeta.delete();
} catch (FileNotFoundException fnfe) {
// nothing to do here
} finally {
if (sc != null) {
sc.close();
}
}
// Restart meta doesn't exist or expired.
if (loadRwr) {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile());
}
}
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
@ -298,4 +328,4 @@ class BlockPoolSlice {
void shutdown() {
dfsUsage.shutdown();
}
}
}

View File

@ -1030,6 +1030,18 @@
</description>
</property>
<property>
<name>dfs.client.datanode-restart.timeout</name>
<value>30</value>
<description>
Expert only. The time to wait, in seconds, from reception of an
datanode shutdown notification for quick restart, until declaring
the datanode dead and invoking the normal recovery mechanisms.
The notification is sent by a datanode when it is being shutdown
using the shutdownDatanode admin command with the upgrade option.
</description>
</property>
<property>
<name>dfs.nameservices</name>
<value></value>

View File

@ -162,19 +162,25 @@ public class TestClientProtocolForPipelineRecovery {
}
}
/** Test recovery on restart OOB message */
/**
* Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only
* one node in the cluster, failure of restart-recovery will fail the
* test.
*/
@Test
public void testPipelineRecoveryOnOOB() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 3;
int numDataNodes = 1;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("dataprotocol2.dat");
DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
getWrappedStream());
out.write(1);
@ -186,10 +192,66 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file.
Thread.sleep(4000);
// Retart the datanode
cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent.
out.close();
Thread.sleep(3000);
final String[] args2 = {"-getDatanodeInfo", dnAddr };
Assert.assertEquals(-1, dfsadmin.run(args2));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/** Test restart timeout */
@Test
public void testPipelineRecoveryOnRestartFailure() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 2;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("dataprotocol3.dat");
DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
DFSAdmin dfsadmin = new DFSAdmin(conf);
DataNode dn = cluster.getDataNodes().get(0);
final String dnAddr1 = dn.getDatanodeId().getIpcAddr(false);
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
Thread.sleep(4000);
// This should succeed without restarting the node. The restart will
// expire and regular pipeline recovery will kick in.
out.close();
// At this point there is only one node in the cluster.
out = (DFSOutputStream)(fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
dn = cluster.getDataNodes().get(1);
final String dnAddr2 = dn.getDatanodeId().getIpcAddr(false);
// issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2));
Thread.sleep(4000);
try {
// close should fail
out.close();
assert false;
} catch (IOException ioe) { }
} finally {
if (cluster != null) {
cluster.shutdown();