From 57b28693ee295746c6d168d37dd05eaf7b601b87 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 25 Feb 2014 19:24:15 +0000 Subject: [PATCH] HDFS-5924. Utilize OOB upgrade message processing for writes. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571792 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-5535.txt | 2 + .../org/apache/hadoop/hdfs/DFSClient.java | 7 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../apache/hadoop/hdfs/DFSOutputStream.java | 198 ++++++++++++++++-- .../hdfs/server/datanode/BlockReceiver.java | 32 ++- .../hadoop/hdfs/server/datanode/DNConf.java | 7 + .../fsdataset/impl/BlockPoolSlice.java | 38 +++- .../src/main/resources/hdfs-default.xml | 12 ++ ...TestClientProtocolForPipelineRecovery.java | 74 ++++++- 9 files changed, 347 insertions(+), 27 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt index 9305b069421..facf58ed5ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt @@ -93,3 +93,5 @@ HDFS-5535 subtasks: HDFS-6015. Fix TestBlockRecovery #testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal) + + HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d908eb4e278..3c066c78f76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; @@ -268,6 +270,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { final int getFileBlockStorageLocationsTimeout; final int retryTimesForGetLastBlockLength; final int retryIntervalForGetLastBlockLength; + final long datanodeRestartTimeout; final boolean useLegacyBlockReader; final boolean useLegacyBlockReaderLocal; @@ -411,6 +414,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { shortCircuitCacheStaleThresholdMs = conf.getLong( DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); + + datanodeRestartTimeout = conf.getLong( + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; } private DataChecksum.Type getChecksumType(Configuration conf) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d77bdc17f0b..7789b60fe38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -94,6 +94,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000; public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis"; public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms + public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; + public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; + public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; + public static final long DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index dc5ccf4a5f2..c1cf238d592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.BufferOverflowException; @@ -335,6 +336,8 @@ public class DFSOutputStream extends FSOutputSummer private String[] favoredNodes; volatile boolean hasError = false; volatile int errorIndex = -1; + volatile int restartingNodeIndex = -1; // Restarting node index + private long restartDeadline = 0; // Deadline of DN restart private BlockConstructionStage stage; // block construction stage private long bytesSent = 0; // number of bytes that've been sent @@ -471,7 +474,7 @@ public class DFSOutputStream extends FSOutputSummer try { // process datanode IO errors if any boolean doSleep = false; - if (hasError && errorIndex>=0) { + if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) { doSleep = processDatanodeError(); } @@ -571,8 +574,12 @@ public class DFSOutputStream extends FSOutputSummer blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to - // write to primary DN - errorIndex = 0; + // write to primary DN. If a failed or restarting node has already + // been recorded by the responder, the following call will have no + // effect. Pipeline recovery can handle only one node error at a + // time. If the primary node fails again during the recovery, it + // will be taken out then. + tryMarkPrimaryDatanodeFailed(); throw e; } lastPacket = Time.now(); @@ -609,12 +616,16 @@ public class DFSOutputStream extends FSOutputSummer Thread.sleep(artificialSlowdown); } } catch (Throwable e) { - DFSClient.LOG.warn("DataStreamer Exception", e); + // Log warning if there was a real error. + if (restartingNodeIndex == -1) { + DFSClient.LOG.warn("DataStreamer Exception", e); + } if (e instanceof IOException) { setLastException((IOException)e); } hasError = true; - if (errorIndex == -1) { // not a datanode error + if (errorIndex == -1 && restartingNodeIndex == -1) { + // Not a datanode issue streamerClosed = true; } } @@ -694,6 +705,65 @@ public class DFSOutputStream extends FSOutputSummer } } + // The following synchronized methods are used whenever + // errorIndex or restartingNodeIndex is set. This is because + // check & set needs to be atomic. Simply reading variables + // does not require a synchronization. When responder is + // not running (e.g. during pipeline recovery), there is no + // need to use these methods. + + /** Set the error node index. Called by responder */ + synchronized void setErrorIndex(int idx) { + errorIndex = idx; + } + + /** Set the restarting node index. Called by responder */ + synchronized void setRestartingNodeIndex(int idx) { + restartingNodeIndex = idx; + // If the data streamer has already set the primary node + // bad, clear it. It is likely that the write failed due to + // the DN shutdown. Even if it was a real failure, the pipeline + // recovery will take care of it. + errorIndex = -1; + } + + /** + * This method is used when no explicit error report was received, + * but something failed. When the primary node is a suspect or + * unsure about the cause, the primary node is marked as failed. + */ + synchronized void tryMarkPrimaryDatanodeFailed() { + // There should be no existing error and no ongoing restart. + if ((errorIndex == -1) && (restartingNodeIndex == -1)) { + errorIndex = 0; + } + } + + /** + * Examine whether it is worth waiting for a node to restart. + * @param index the node index + */ + boolean shouldWaitForRestart(int index) { + // Only one node in the pipeline. + if (nodes.length == 1) { + return true; + } + + // Is it a local node? + InetAddress addr = null; + try { + addr = InetAddress.getByName(nodes[index].getIpAddr()); + } catch (java.net.UnknownHostException e) { + // we are passing an ip address. this should not happen. + assert false; + } + + if (addr != null && NetUtils.isLocalAddress(addr)) { + return true; + } + return false; + } + // // Processes responses from the datanodes. A packet is removed // from the ackQueue when its response arrives. @@ -727,8 +797,20 @@ public class DFSOutputStream extends FSOutputSummer // processes response status from datanodes. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { final Status reply = ack.getReply(i); + // Restart will not be treated differently unless it is + // the local node or the only one in the pipeline. + if (PipelineAck.isRestartOOBStatus(reply) && + shouldWaitForRestart(i)) { + restartDeadline = dfsClient.getConf().datanodeRestartTimeout + + Time.now(); + setRestartingNodeIndex(i); + String message = "A datanode is restarting: " + targets[i]; + DFSClient.LOG.info(message); + throw new IOException(message); + } + // node error if (reply != SUCCESS) { - errorIndex = i; // first bad datanode + setErrorIndex(i); // first bad datanode throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + @@ -777,12 +859,16 @@ public class DFSOutputStream extends FSOutputSummer setLastException((IOException)e); } hasError = true; - errorIndex = errorIndex==-1 ? 0 : errorIndex; + // If no explicit error report was received, mark the primary + // node as failed. + tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) { dataQueue.notifyAll(); } - DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " - + " for block " + block, e); + if (restartingNodeIndex == -1) { + DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " + + " for block " + block, e); + } responderClosed = true; } } @@ -1001,6 +1087,24 @@ public class DFSOutputStream extends FSOutputSummer boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { + // Sleep before reconnect if a dn is restarting. + // This process will be repeated until the deadline or the datanode + // starts back up. + if (restartingNodeIndex >= 0) { + // 4 seconds or the configured deadline period, whichever is shorter. + // This is the retry interval and recovery will be retried in this + // interval until timeout or success. + long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, + 4000L); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + lastException.set(new IOException("Interrupted while waiting for " + + "datanode to restart. " + nodes[restartingNodeIndex])); + streamerClosed = true; + return false; + } + } boolean isRecovery = hasError; // remove bad datanode from list of datanodes. // If errorIndex was not set (i.e. appends), then do not remove @@ -1037,7 +1141,24 @@ public class DFSOutputStream extends FSOutputSummer setPipeline(newnodes, newStorageIDs); - hasError = false; + // Just took care of a node error while waiting for a node restart + if (restartingNodeIndex >= 0) { + // If the error came from a node further away than the restarting + // node, the restart must have been complete. + if (errorIndex > restartingNodeIndex) { + restartingNodeIndex = -1; + } else if (errorIndex < restartingNodeIndex) { + // the node index has shifted. + restartingNodeIndex--; + } else { + // this shouldn't happen... + assert false; + } + } + + if (restartingNodeIndex == -1) { + hasError = false; + } lastException.set(null); errorIndex = -1; } @@ -1066,7 +1187,34 @@ public class DFSOutputStream extends FSOutputSummer } else { success = createBlockOutputStream(nodes, newGS, isRecovery); } - } + + if (restartingNodeIndex >= 0) { + assert hasError == true; + // check errorIndex set above + if (errorIndex == restartingNodeIndex) { + // ignore, if came from the restarting node + errorIndex = -1; + } + // still within the deadline + if (Time.now() < restartDeadline) { + continue; // with in the deadline + } + // expired. declare the restarting node dead + restartDeadline = 0; + int expiredNodeIndex = restartingNodeIndex; + restartingNodeIndex = -1; + DFSClient.LOG.warn("Datanode did not restart in time: " + + nodes[expiredNodeIndex]); + // Mark the restarting node as failed. If there is any other failed + // node during the last pipeline construction attempt, it will not be + // overwritten/dropped. In this case, the restarting node will get + // excluded in the following attempt, if it still does not come up. + if (errorIndex == -1) { + errorIndex = expiredNodeIndex; + } + // From this point on, normal pipeline recovery applies. + } + } // while if (success) { // update pipeline at the namenode @@ -1144,6 +1292,7 @@ public class DFSOutputStream extends FSOutputSummer } Status pipelineStatus = SUCCESS; String firstBadLink = ""; + boolean checkRestart = false; if (DFSClient.LOG.isDebugEnabled()) { for (int i = 0; i < nodes.length; i++) { DFSClient.LOG.debug("pipeline = " + nodes[i]); @@ -1192,6 +1341,16 @@ public class DFSOutputStream extends FSOutputSummer pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); + // Got an restart OOB ack. + // If a node is already restarting, this status is not likely from + // the same node. If it is from a different node, it is not + // from the local datanode. Thus it is safe to treat this as a + // regular node error. + if (PipelineAck.isRestartOOBStatus(pipelineStatus) && + restartingNodeIndex == -1) { + checkRestart = true; + throw new IOException("A datanode is restarting."); + } if (pipelineStatus != SUCCESS) { if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( @@ -1205,9 +1364,12 @@ public class DFSOutputStream extends FSOutputSummer assert null == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success - + restartingNodeIndex = -1; + hasError = false; } catch (IOException ie) { - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + if (restartingNodeIndex == -1) { + DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " @@ -1230,8 +1392,18 @@ public class DFSOutputStream extends FSOutputSummer } } } else { + assert checkRestart == false; errorIndex = 0; } + // Check whether there is a restart worth waiting for. + if (checkRestart && shouldWaitForRestart(errorIndex)) { + restartDeadline = dfsClient.getConf().datanodeRestartTimeout + + Time.now(); + restartingNodeIndex = errorIndex; + errorIndex = -1; + DFSClient.LOG.info("Waiting for the datanode to be restarted: " + + nodes[restartingNodeIndex]); + } hasError = true; setLastException(ie); result = false; // error diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 29d416ad529..3e6f5c8a414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -23,8 +23,10 @@ import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.FileDescriptor; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -52,6 +55,7 @@ import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -116,6 +120,7 @@ class BlockReceiver implements Closeable { private final boolean isTransfer; private boolean syncOnClose; + private long restartBudget; BlockReceiver(final ExtendedBlock block, final DataInputStream in, final String inAddr, final String myAddr, @@ -135,6 +140,7 @@ class BlockReceiver implements Closeable { this.clientname = clientname; this.isDatanode = clientname.length() == 0; this.isClient = !this.isDatanode; + this.restartBudget = datanode.getDnConf().restartReplicaExpiry; //for datanode, we have //1: clientName.length() == 0, and @@ -742,16 +748,35 @@ class BlockReceiver implements Closeable { if (responder != null) { // In case this datanode is shutting down for quick restart, // send a special ack upstream. - if (datanode.isRestarting()) { + if (datanode.isRestarting() && isClient && !isTransfer) { + File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile(); + File restartMeta = new File(blockFile.getParent() + + File.pathSeparator + "." + blockFile.getName() + ".restart"); + if (restartMeta.exists()) { + restartMeta.delete(); + } + try { + FileWriter out = new FileWriter(restartMeta); + // write out the current time. + out.write(Long.toString(Time.now() + restartBudget)); + out.flush(); + out.close(); + } catch (IOException ioe) { + // The worst case is not recovering this RBW replica. + // Client will fall back to regular pipeline recovery. + } try { ((PacketResponder) responder.getRunnable()). sendOOBResponse(PipelineAck.getRestartOOBStatus()); + // Even if the connection is closed after the ack packet is + // flushed, the client can react to the connection closure + // first. Insert a delay to lower the chance of client + // missing the OOB ack. + Thread.sleep(1000); } catch (InterruptedException ie) { // It is already going down. Ignore this. } catch (IOException ioe) { LOG.info("Error sending OOB Ack.", ioe); - // The OOB ack could not be sent. Since the datanode is going - // down, this is ignored. } } responder.interrupt(); @@ -1279,7 +1304,6 @@ class BlockReceiver implements Closeable { && offsetInBlock > replicaInfo.getBytesAcked()) { replicaInfo.setBytesAcked(offsetInBlock); } - // send my ack back to upstream datanode replyAck.write(upstreamOut); upstreamOut.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 73f3661182f..199a2c0c389 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NA import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -82,6 +84,7 @@ public class DNConf { final String encryptionAlgorithm; final long xceiverStopTimeout; + final long restartReplicaExpiry; final long maxLockedMemory; @@ -157,6 +160,10 @@ public class DNConf { this.maxLockedMemory = conf.getLong( DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); + + this.restartReplicaExpiry = conf.getLong( + DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, + DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index ed9ba589dc6..e3e441028de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -21,9 +21,11 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; +import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DU; @@ -36,11 +38,13 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; /** * A block pool slice represents a portion of a block pool stored on a volume. @@ -191,9 +195,35 @@ class BlockPoolSlice { newReplica = new FinalizedReplica(blockId, blockFile.length(), genStamp, volume, blockFile.getParentFile()); } else { - newReplica = new ReplicaWaitingToBeRecovered(blockId, - validateIntegrityAndSetLength(blockFile, genStamp), - genStamp, volume, blockFile.getParentFile()); + + boolean loadRwr = true; + File restartMeta = new File(blockFile.getParent() + + File.pathSeparator + "." + blockFile.getName() + ".restart"); + Scanner sc = null; + try { + sc = new Scanner(restartMeta); + // The restart meta file exists + if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { + // It didn't expire. Load the replica as a RBW. + newReplica = new ReplicaBeingWritten(blockId, + validateIntegrityAndSetLength(blockFile, genStamp), + genStamp, volume, blockFile.getParentFile(), null); + loadRwr = false; + } + restartMeta.delete(); + } catch (FileNotFoundException fnfe) { + // nothing to do here + } finally { + if (sc != null) { + sc.close(); + } + } + // Restart meta doesn't exist or expired. + if (loadRwr) { + newReplica = new ReplicaWaitingToBeRecovered(blockId, + validateIntegrityAndSetLength(blockFile, genStamp), + genStamp, volume, blockFile.getParentFile()); + } } ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); @@ -298,4 +328,4 @@ class BlockPoolSlice { void shutdown() { dfsUsage.shutdown(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 26c7b70adb1..2468cb10745 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1030,6 +1030,18 @@ + + dfs.client.datanode-restart.timeout + 30 + + Expert only. The time to wait, in seconds, from reception of an + datanode shutdown notification for quick restart, until declaring + the datanode dead and invoking the normal recovery mechanisms. + The notification is sent by a datanode when it is being shutdown + using the shutdownDatanode admin command with the upgrade option. + + + dfs.nameservices diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index d583bb302ce..04853bd2df7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -162,19 +162,25 @@ public class TestClientProtocolForPipelineRecovery { } } - /** Test recovery on restart OOB message */ + /** + * Test recovery on restart OOB message. It also tests the delivery of + * OOB ack originating from the primary datanode. Since there is only + * one node in the cluster, failure of restart-recovery will fail the + * test. + */ @Test public void testPipelineRecoveryOnOOB() throws Exception { Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15"); MiniDFSCluster cluster = null; try { - int numDataNodes = 3; + int numDataNodes = 1; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); FileSystem fileSys = cluster.getFileSystem(); Path file = new Path("dataprotocol2.dat"); - DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L); + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); DFSOutputStream out = (DFSOutputStream)(fileSys.append(file). getWrappedStream()); out.write(1); @@ -186,10 +192,66 @@ public class TestClientProtocolForPipelineRecovery { // issue shutdown to the datanode. final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); + // Wait long enough to receive an OOB ack before closing the file. + Thread.sleep(4000); + // Retart the datanode + cluster.restartDataNode(0, true); + // The following forces a data packet and end of block packets to be sent. out.close(); - Thread.sleep(3000); - final String[] args2 = {"-getDatanodeInfo", dnAddr }; - Assert.assertEquals(-1, dfsadmin.run(args2)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** Test restart timeout */ + @Test + public void testPipelineRecoveryOnRestartFailure() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5"); + MiniDFSCluster cluster = null; + try { + int numDataNodes = 2; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("dataprotocol3.dat"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L); + DFSOutputStream out = (DFSOutputStream)(fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + DFSAdmin dfsadmin = new DFSAdmin(conf); + DataNode dn = cluster.getDataNodes().get(0); + final String dnAddr1 = dn.getDatanodeId().getIpcAddr(false); + // issue shutdown to the datanode. + final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" }; + Assert.assertEquals(0, dfsadmin.run(args1)); + Thread.sleep(4000); + // This should succeed without restarting the node. The restart will + // expire and regular pipeline recovery will kick in. + out.close(); + + // At this point there is only one node in the cluster. + out = (DFSOutputStream)(fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + dn = cluster.getDataNodes().get(1); + final String dnAddr2 = dn.getDatanodeId().getIpcAddr(false); + // issue shutdown to the datanode. + final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" }; + Assert.assertEquals(0, dfsadmin.run(args2)); + Thread.sleep(4000); + try { + // close should fail + out.close(); + assert false; + } catch (IOException ioe) { } } finally { if (cluster != null) { cluster.shutdown();