From 1c6b5d2b5841e5219a98937088cde4ae63869f80 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 24 Feb 2014 23:38:04 +0000 Subject: [PATCH] HDFS-5583. Make DN send an OOB Ack on shutdown before restarting. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571491 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-5535.txt | 1 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../protocol/datatransfer/PipelineAck.java | 68 +++++- .../hdfs/server/datanode/BlockReceiver.java | 204 +++++++++++++++--- .../hadoop/hdfs/server/datanode/DataNode.java | 93 ++++++-- .../hdfs/server/datanode/DataXceiver.java | 2 +- .../server/datanode/DataXceiverServer.java | 75 +++++-- .../src/main/proto/datatransfer.proto | 4 + ...TestClientProtocolForPipelineRecovery.java | 37 ++++ 9 files changed, 416 insertions(+), 70 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt index 352a826af4b..bbc1df26ded 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt @@ -82,3 +82,4 @@ HDFS-5535 subtasks: HDFS-6004. Change DFSAdmin for rolling upgrade commands. (szetszwo via Arpit Agarwal) + HDFS-5583. Make DN send an OOB Ack on shutdown before restarting. (kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dc6538e0718..b127209bb84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -225,6 +225,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 4000; + public static final String DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms"; + public static final String DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1, OOB_TYPE2, OOB_TYPE3, OOB_TYPE4 public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index b743e29f217..55cbc9a40ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -26,10 +26,12 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; - import com.google.protobuf.TextFormat; /** Pipeline Acknowledgment **/ @@ -38,6 +40,21 @@ import com.google.protobuf.TextFormat; public class PipelineAck { PipelineAckProto proto; public final static long UNKOWN_SEQNO = -2; + final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type + final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type + final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1; + // place holder for timeout value of each OOB type + final static long[] OOB_TIMEOUT; + + static { + OOB_TIMEOUT = new long[NUM_OOB_TYPES]; + HdfsConfiguration conf = new HdfsConfiguration(); + String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY, + DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(","); + for (int i = 0; i < NUM_OOB_TYPES; i++) { + OOB_TIMEOUT[i] = (i < ele.length) ? Long.valueOf(ele[i]) : 0; + } + } /** default constructor **/ public PipelineAck() { @@ -103,14 +120,57 @@ public class PipelineAck { * @return true if all statuses are SUCCESS */ public boolean isSuccess() { - for (DataTransferProtos.Status reply : proto.getStatusList()) { - if (reply != DataTransferProtos.Status.SUCCESS) { + for (Status reply : proto.getStatusList()) { + if (reply != Status.SUCCESS) { return false; } } return true; } - + + /** + * Returns the OOB status if this ack contains one. + * @return null if it is not an OOB ack. + */ + public Status getOOBStatus() { + // Normal data transfer acks will have a valid sequence number, so + // this will return right away in most cases. + if (getSeqno() != UNKOWN_SEQNO) { + return null; + } + for (Status reply : proto.getStatusList()) { + // The following check is valid because protobuf guarantees to + // preserve the ordering of enum elements. + if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) { + return reply; + } + } + return null; + } + + /** + * Get the timeout to be used for transmitting the OOB type + * @return the timeout in milliseconds + */ + public static long getOOBTimeout(Status status) throws IOException { + int index = status.getNumber() - OOB_START; + if (index >= 0 && index < NUM_OOB_TYPES) { + return OOB_TIMEOUT[index]; + } + // Not an OOB. + throw new IOException("Not an OOB status: " + status); + } + + /** Get the Restart OOB ack status */ + public static Status getRestartOOBStatus() { + return Status.OOB_RESTART; + } + + /** return true if it is the restart OOB status code */ + public static boolean isRestartOOBStatus(Status st) { + return st.equals(Status.OOB_RESTART); + } + /**** Writable interface ****/ public void readFields(InputStream in) throws IOException { proto = PipelineAckProto.parseFrom(vintPrefixed(in)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index c1ed03ceb32..81410b55c75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -723,14 +723,40 @@ class BlockReceiver implements Closeable { } } catch (IOException ioe) { - LOG.info("Exception for " + block, ioe); - throw ioe; + if (datanode.isRestarting()) { + // Do not throw if shutting down for restart. Otherwise, it will cause + // premature termination of responder. + LOG.info("Shutting down for restart (" + block + ")."); + } else { + LOG.info("Exception for " + block, ioe); + throw ioe; + } } finally { - if (!responderClosed) { // Abnormal termination of the flow above - IOUtils.closeStream(this); + // Clear the previous interrupt state of this thread. + Thread.interrupted(); + + // If a shutdown for restart was initiated, upstream needs to be notified. + // There is no need to do anything special if the responder was closed + // normally. + if (!responderClosed) { // Data transfer was not complete. if (responder != null) { + // In case this datanode is shutting down for quick restart, + // send a special ack upstream. + if (datanode.isRestarting()) { + try { + ((PacketResponder) responder.getRunnable()). + sendOOBResponse(PipelineAck.getRestartOOBStatus()); + } catch (InterruptedException ie) { + // It is already going down. Ignore this. + } catch (IOException ioe) { + LOG.info("Error sending OOB Ack.", ioe); + // The OOB ack could not be sent. Since the datanode is going + // down, this is ignored. + } + } responder.interrupt(); } + IOUtils.closeStream(this); cleanupBlock(); } if (responder != null) { @@ -744,7 +770,10 @@ class BlockReceiver implements Closeable { } } catch (InterruptedException e) { responder.interrupt(); - throw new IOException("Interrupted receiveBlock"); + // do not throw if shutting down for restart. + if (!datanode.isRestarting()) { + throw new IOException("Interrupted receiveBlock"); + } } responder = null; } @@ -862,6 +891,7 @@ class BlockReceiver implements Closeable { private final PacketResponderType type; /** for log and error messages */ private final String myString; + private boolean sending = false; @Override public String toString() { @@ -887,7 +917,9 @@ class BlockReceiver implements Closeable { } private boolean isRunning() { - return running && datanode.shouldRun; + // When preparing for a restart, it should continue to run until + // interrupted by the receiver thread. + return running && (datanode.shouldRun || datanode.isRestarting()); } /** @@ -903,44 +935,96 @@ class BlockReceiver implements Closeable { if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p); } - synchronized(this) { + synchronized(ackQueue) { if (running) { ackQueue.addLast(p); - notifyAll(); + ackQueue.notifyAll(); + } + } + } + + /** + * Send an OOB response. If all acks have been sent already for the block + * and the responder is about to close, the delivery is not guaranteed. + * This is because the other end can close the connection independently. + * An OOB coming from downstream will be automatically relayed upstream + * by the responder. This method is used only by originating datanode. + * + * @param ackStatus the type of ack to be sent + */ + void sendOOBResponse(final Status ackStatus) throws IOException, + InterruptedException { + if (!running) { + LOG.info("Cannot send OOB response " + ackStatus + + ". Responder not running."); + return; + } + + synchronized(this) { + if (sending) { + wait(PipelineAck.getOOBTimeout(ackStatus)); + // Didn't get my turn in time. Give up. + if (sending) { + throw new IOException("Could not send OOB reponse in time: " + + ackStatus); + } + } + sending = true; + } + + LOG.info("Sending an out of band ack of type " + ackStatus); + try { + sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, + ackStatus); + } finally { + // Let others send ack. Unless there are miltiple OOB send + // calls, there can be only one waiter, the responder thread. + // In any case, only one needs to be notified. + synchronized(this) { + sending = false; + notify(); } } } /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */ - synchronized Packet waitForAckHead(long seqno) throws InterruptedException { - while (isRunning() && ackQueue.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(myString + ": seqno=" + seqno + - " waiting for local datanode to finish write."); + Packet waitForAckHead(long seqno) throws InterruptedException { + synchronized(ackQueue) { + while (isRunning() && ackQueue.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(myString + ": seqno=" + seqno + + " waiting for local datanode to finish write."); + } + ackQueue.wait(); } - wait(); + return isRunning() ? ackQueue.getFirst() : null; } - return isRunning() ? ackQueue.getFirst() : null; } /** * wait for all pending packets to be acked. Then shutdown thread. */ @Override - public synchronized void close() { - while (isRunning() && ackQueue.size() != 0) { - try { - wait(); - } catch (InterruptedException e) { - running = false; - Thread.currentThread().interrupt(); + public void close() { + synchronized(ackQueue) { + while (isRunning() && ackQueue.size() != 0) { + try { + ackQueue.wait(); + } catch (InterruptedException e) { + running = false; + Thread.currentThread().interrupt(); + } } + if(LOG.isDebugEnabled()) { + LOG.debug(myString + ": closing"); + } + running = false; + ackQueue.notifyAll(); } - if(LOG.isDebugEnabled()) { - LOG.debug(myString + ": closing"); + + synchronized(this) { + notifyAll(); } - running = false; - notifyAll(); } /** @@ -968,6 +1052,14 @@ class BlockReceiver implements Closeable { if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack); } + // Process an OOB ACK. + Status oobStatus = ack.getOOBStatus(); + if (oobStatus != null) { + LOG.info("Relaying an out of band ack of type " + oobStatus); + sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, + Status.SUCCESS); + continue; + } seqno = ack.getSeqno(); } if (seqno != PipelineAck.UNKOWN_SEQNO @@ -1025,6 +1117,9 @@ class BlockReceiver implements Closeable { * status back to the client because this datanode has a problem. * The upstream datanode will detect that this datanode is bad, and * rightly so. + * + * The receiver thread can also interrupt this thread for sending + * an out-of-band response upstream. */ LOG.info(myString + ": Thread is interrupted."); running = false; @@ -1094,17 +1189,64 @@ class BlockReceiver implements Closeable { } /** + * The wrapper for the unprotected version. This is only called by + * the responder's run() method. + * * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet + * @param myStatus the local ack status */ private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException { + try { + // Wait for other sender to finish. Unless there is an OOB being sent, + // the responder won't have to wait. + synchronized(this) { + while(sending) { + wait(); + } + sending = true; + } + + try { + if (!running) return; + sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, + offsetInBlock, myStatus); + } finally { + synchronized(this) { + sending = false; + notify(); + } + } + } catch (InterruptedException ie) { + // The responder was interrupted. Make it go down without + // interrupting the receiver(writer) thread. + running = false; + } + } + + /** + * @param ack Ack received from downstream + * @param seqno sequence number of ack to be sent upstream + * @param totalAckTimeNanos total ack time including all the downstream + * nodes + * @param offsetInBlock offset in block for the data in packet + * @param myStatus the local ack status + */ + private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, + long totalAckTimeNanos, long offsetInBlock, Status myStatus) + throws IOException { Status[] replies = null; - if (mirrorError) { // ack read error + if (ack == null) { + // A new OOB response is being sent from this node. Regardless of + // downstream nodes, reply should contain one reply. + replies = new Status[1]; + replies[0] = myStatus; + } else if (mirrorError) { // ack read error replies = MIRROR_ERROR_STATUS; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack @@ -1152,9 +1294,11 @@ class BlockReceiver implements Closeable { * * This should be called only when the ack queue is not empty */ - private synchronized void removeAckHead() { - ackQueue.removeFirst(); - notifyAll(); + private void removeAckHead() { + synchronized(ackQueue) { + ackQueue.removeFirst(); + ackQueue.notifyAll(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8ad2072f02c..7e1f53f3ba2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -175,6 +175,8 @@ public class DataNode extends Configured } volatile boolean shouldRun = true; + volatile boolean shutdownForUpgrade = false; + private boolean shutdownInProgress = false; private BlockPoolManager blockPoolManager; volatile FsDatasetSpi data = null; private String clusterId = null; @@ -1190,9 +1192,31 @@ public class DataNode extends Configured // offerServices may be modified. BPOfferService[] bposArray = this.blockPoolManager == null ? null : this.blockPoolManager.getAllNamenodeThreads(); - this.shouldRun = false; + // If shutdown is not for restart, set shouldRun to false early. + if (!shutdownForUpgrade) { + shouldRun = false; + } + + // When shutting down for restart, DataXceiverServer is interrupted + // in order to avoid any further acceptance of requests, but the peers + // for block writes are not closed until the clients are notified. + if (dataXceiverServer != null) { + ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); + this.dataXceiverServer.interrupt(); + } + + // Record the time of initial notification + long timeNotified = Time.now(); + + if (localDataXceiverServer != null) { + ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill(); + this.localDataXceiverServer.interrupt(); + } + + // Terminate directory scanner and block scanner shutdownPeriodicScanners(); - + + // Stop the web server if (infoServer != null) { try { infoServer.stop(); @@ -1200,26 +1224,24 @@ public class DataNode extends Configured LOG.warn("Exception shutting down DataNode", e); } } - if (ipcServer != null) { - ipcServer.stop(); - } if (pauseMonitor != null) { pauseMonitor.stop(); } + + // shouldRun is set to false here to prevent certain threads from exiting + // before the restart prep is done. + this.shouldRun = false; - if (dataXceiverServer != null) { - ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); - this.dataXceiverServer.interrupt(); - } - if (localDataXceiverServer != null) { - ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill(); - this.localDataXceiverServer.interrupt(); - } // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; while (true) { - this.threadGroup.interrupt(); + // When shutting down for restart, wait 2.5 seconds before forcing + // termination of receiver threads. + if (!this.shutdownForUpgrade || + (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) { + this.threadGroup.interrupt(); + } LOG.info("Waiting for threadgroup to exit, active threads is " + this.threadGroup.activeCount()); if (this.threadGroup.activeCount() == 0) { @@ -1249,7 +1271,13 @@ public class DataNode extends Configured } catch (InterruptedException ie) { } } - + + // IPC server needs to be shutdown late in the process, otherwise + // shutdown command response won't get sent. + if (ipcServer != null) { + ipcServer.stop(); + } + if(blockPoolManager != null) { try { this.blockPoolManager.shutDownAll(bposArray); @@ -1275,6 +1303,11 @@ public class DataNode extends Configured MBeans.unregister(dataNodeInfoBeanName); dataNodeInfoBeanName = null; } + LOG.info("Shutdown complete."); + synchronized(this) { + // Notify the main thread. + notifyAll(); + } } @@ -1775,7 +1808,11 @@ public class DataNode extends Configured && blockPoolManager.getAllNamenodeThreads().length == 0) { shouldRun = false; } - Thread.sleep(2000); + // Terminate if shutdown is complete or 2 seconds after all BPs + // are shutdown. + synchronized(this) { + wait(2000); + } } catch (InterruptedException ex) { LOG.warn("Received exception in Datanode#join: " + ex); } @@ -2411,17 +2448,27 @@ public class DataNode extends Configured } @Override // ClientDatanodeProtocol - public void shutdownDatanode(boolean forUpgrade) throws IOException { + public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException { LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade + "). Shutting down Datanode..."); - // Delay start the shutdown process so that the rpc response can be + // Shutdown can be called only once. + if (shutdownInProgress) { + throw new IOException("Shutdown already in progress."); + } + shutdownInProgress = true; + shutdownForUpgrade = forUpgrade; + + // Asynchronously start the shutdown process so that the rpc response can be // sent back. Thread shutdownThread = new Thread() { @Override public void run() { - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { } + if (!shutdownForUpgrade) { + // Delay the shutdown a bit if not doing for restart. + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { } + } shutdown(); } }; @@ -2462,6 +2509,10 @@ public class DataNode extends Configured return bp != null ? bp.isAlive() : false; } + boolean isRestarting() { + return shutdownForUpgrade; + } + /** * A datanode is considered to be fully started if all the BP threads are * alive and all the block pools are initialized. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index da38b91ac13..2a8379234dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -158,8 +158,8 @@ class DataXceiver extends Receiver implements Runnable { int opsProcessed = 0; Op op = null; - dataXceiverServer.addPeer(peer); try { + dataXceiverServer.addPeer(peer, Thread.currentThread()); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 2755eb415f8..618aaaa6a0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -20,8 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -45,7 +44,8 @@ class DataXceiverServer implements Runnable { private final PeerServer peerServer; private final DataNode datanode; - private final Set peers = new HashSet(); + private final HashMap peers = new HashMap(); + private boolean closed = false; /** * Maximal number of concurrent xceivers per node. @@ -127,7 +127,7 @@ class DataXceiverServer implements Runnable { @Override public void run() { Peer peer = null; - while (datanode.shouldRun) { + while (datanode.shouldRun && !datanode.shutdownForUpgrade) { try { peer = peerServer.accept(); @@ -147,7 +147,7 @@ class DataXceiverServer implements Runnable { } catch (AsynchronousCloseException ace) { // another thread closed our listener socket - that's expected during shutdown, // but not in other circumstances - if (datanode.shouldRun) { + if (datanode.shouldRun && !datanode.shutdownForUpgrade) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { @@ -170,35 +170,82 @@ class DataXceiverServer implements Runnable { datanode.shouldRun = false; } } - synchronized (this) { - for (Peer p : peers) { - IOUtils.cleanup(LOG, p); - } - } + + // Close the server to stop reception of more requests. try { peerServer.close(); + closed = true; } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + " :DataXceiverServer: close exception", ie); } + + // if in restart prep stage, notify peers before closing them. + if (datanode.shutdownForUpgrade) { + restartNotifyPeers(); + // Each thread needs some time to process it. If a thread needs + // to send an OOB message to the client, but blocked on network for + // long time, we need to force its termination. + LOG.info("Shutting down DataXceiverServer before restart"); + // Allow roughly up to 2 seconds. + for (int i = 0; getNumPeers() > 0 && i < 10; i++) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // ignore + } + } + } + // Close all peers. + closeAllPeers(); } void kill() { - assert datanode.shouldRun == false : - "shoudRun should be set to false before killing"; + assert (datanode.shouldRun == false || datanode.shutdownForUpgrade) : + "shoudRun should be set to false or restarting should be true" + + " before killing"; try { this.peerServer.close(); + this.closed = true; } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); } } - synchronized void addPeer(Peer peer) { - peers.add(peer); + synchronized void addPeer(Peer peer, Thread t) throws IOException { + if (closed) { + throw new IOException("Server closed."); + } + peers.put(peer, t); } synchronized void closePeer(Peer peer) { peers.remove(peer); IOUtils.cleanup(null, peer); } + + // Notify all peers of the shutdown and restart. + // datanode.shouldRun should still be true and datanode.restarting should + // be set true before calling this method. + synchronized void restartNotifyPeers() { + assert (datanode.shouldRun == true && datanode.shutdownForUpgrade); + for (Peer p : peers.keySet()) { + // interrupt each and every DataXceiver thread. + peers.get(p).interrupt(); + } + } + + // Close all peers and clear the map. + synchronized void closeAllPeers() { + LOG.info("Closing all peers."); + for (Peer p : peers.keySet()) { + IOUtils.cleanup(LOG, p); + } + peers.clear(); + } + + // Return the number of peers. + synchronized int getNumPeers() { + return peers.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 528633f935b..28f37729e44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -157,6 +157,10 @@ enum Status { ERROR_ACCESS_TOKEN = 5; CHECKSUM_OK = 6; ERROR_UNSUPPORTED = 7; + OOB_RESTART = 8; // Quick restart + OOB_RESERVED1 = 9; // Reserved + OOB_RESERVED2 = 10; // Reserved + OOB_RESERVED3 = 11; // Reserved } message PipelineAckProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 86a9fec48b5..d583bb302ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; @@ -159,4 +161,39 @@ public class TestClientProtocolForPipelineRecovery { } } } + + /** Test recovery on restart OOB message */ + @Test + public void testPipelineRecoveryOnOOB() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + int numDataNodes = 3; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("dataprotocol2.dat"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L); + DFSOutputStream out = (DFSOutputStream)(fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + DFSAdmin dfsadmin = new DFSAdmin(conf); + DataNode dn = cluster.getDataNodes().get(0); + final String dnAddr = dn.getDatanodeId().getIpcAddr(false); + // issue shutdown to the datanode. + final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; + Assert.assertEquals(0, dfsadmin.run(args1)); + out.close(); + Thread.sleep(3000); + final String[] args2 = {"-getDatanodeInfo", dnAddr }; + Assert.assertEquals(-1, dfsadmin.run(args2)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }