diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 5621de037d3..a60c17d101b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -140,6 +140,13 @@ HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, */ void shutdownDatanode(boolean forUpgrade) throws IOException; + /** + * Evict clients that are writing to a datanode. + * + * @throws IOException + */ + void evictWriters() throws IOException; + /** * Obtains datanode info * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 3782660ce41..a2e0b35a6a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -104,6 +105,8 @@ public class ClientDatanodeProtocolTranslatorPB implements private static final GetBalancerBandwidthRequestProto VOID_GET_BALANCER_BANDWIDTH = GetBalancerBandwidthRequestProto.newBuilder().build(); + private final static EvictWritersRequestProto VOID_EVICT_WRITERS = + EvictWritersRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -286,6 +289,15 @@ public void shutdownDatanode(boolean forUpgrade) throws IOException { } } + @Override + public void evictWriters() throws IOException { + try { + rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public DatanodeLocalInfo getDatanodeInfo() throws IOException { GetDatanodeInfoResponseProto response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 467cbe0850f..4e185997372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -140,6 +140,13 @@ message ShutdownDatanodeRequestProto { message ShutdownDatanodeResponseProto { } +/** Tell datanode to evict active clients that are writing */ +message EvictWritersRequestProto { +} + +message EvictWritersResponseProto { +} + /** * Ping datanode for liveness and quick info */ @@ -209,6 +216,9 @@ service ClientDatanodeProtocolService { rpc shutdownDatanode(ShutdownDatanodeRequestProto) returns(ShutdownDatanodeResponseProto); + rpc evictWriters(EvictWritersRequestProto) + returns(EvictWritersResponseProto); + rpc getDatanodeInfo(GetDatanodeInfoRequestProto) returns(GetDatanodeInfoResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 5324ab66d32..e712a652d75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -79,6 +81,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements StartReconfigurationResponseProto.newBuilder().build(); private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP = TriggerBlockReportResponseProto.newBuilder().build(); + private final static EvictWritersResponseProto EVICT_WRITERS_RESP = + EvictWritersResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -186,6 +190,17 @@ public ShutdownDatanodeResponseProto shutdownDatanode( return SHUTDOWN_DATANODE_RESP; } + @Override + public EvictWritersResponseProto evictWriters(RpcController unused, + EvictWritersRequestProto request) throws ServiceException { + try { + impl.evictWriters(); + } catch (IOException e) { + throw new ServiceException(e); + } + return EVICT_WRITERS_RESP; + } + public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused, GetDatanodeInfoRequestProto request) throws ServiceException { GetDatanodeInfoResponseProto res; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index fb0c1c5b1c3..8f9138ca6bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -889,6 +889,9 @@ private void manageWriterOsCache(long offsetInBlock) { } public void sendOOB() throws IOException, InterruptedException { + if (isDatanode) { + return; + } ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b6a7afe0f0c..9ad6214f35d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2948,6 +2948,13 @@ public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException shutdownThread.start(); } + @Override //ClientDatanodeProtocol + public void evictWriters() throws IOException { + checkSuperuserPrivilege(); + LOG.info("Evicting all writers."); + xserver.stopWriters(); + } + @Override //ClientDatanodeProtocol public DatanodeLocalInfo getDatanodeInfo() { long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index c23e8a76b54..e5ae98f320c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -115,6 +115,7 @@ class DataXceiver extends Receiver implements Runnable { private BlockReceiver blockReceiver = null; private final int ioFileBufferSize; private final int smallBufferSize; + private Thread xceiver = null; /** * Client Name used in previous operation. Not available on first request @@ -177,9 +178,38 @@ private OutputStream getOutputStream() { } public void sendOOB() throws IOException, InterruptedException { + BlockReceiver br = getCurrentBlockReceiver(); + if (br == null) { + return; + } + // This doesn't need to be in a critical section. Althogh the client + // can resue the connection to issue a different request, trying sending + // an OOB through the recently closed block receiver is harmless. LOG.info("Sending OOB to peer: " + peer); - if(blockReceiver!=null) - blockReceiver.sendOOB(); + br.sendOOB(); + } + + public void stopWriter() { + // We want to interrupt the xceiver only when it is serving writes. + synchronized(this) { + if (getCurrentBlockReceiver() == null) { + return; + } + xceiver.interrupt(); + } + LOG.info("Stopped the writer: " + peer); + } + + /** + * blockReceiver is updated at multiple places. Use the synchronized setter + * and getter. + */ + private synchronized void setCurrentBlockReceiver(BlockReceiver br) { + blockReceiver = br; + } + + private synchronized BlockReceiver getCurrentBlockReceiver() { + return blockReceiver; } /** @@ -191,6 +221,9 @@ public void run() { Op op = null; try { + synchronized(this) { + xceiver = Thread.currentThread(); + } dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; @@ -678,12 +711,12 @@ public void writeBlock(final ExtendedBlock block, if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = getBlockReceiver(block, storageType, in, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning); + cachingStrategy, allowLazyPersist, pinning)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose( @@ -852,7 +885,7 @@ public void writeBlock(final ExtendedBlock block, IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); - blockReceiver = null; + setCurrentBlockReceiver(null); } //update metrics @@ -1079,7 +1112,6 @@ public void replaceBlock(final ExtendedBlock block, DataOutputStream proxyOut = null; Status opStatus = SUCCESS; String errMsg = null; - BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; boolean IoeDuringCopyBlockOperation = false; try { @@ -1138,11 +1170,11 @@ public void replaceBlock(final ExtendedBlock block, DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = getBlockReceiver(block, storageType, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false, false); + CachingStrategy.newDropBehind(), false, false)); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 10945e71682..126d5b10d42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -256,6 +256,12 @@ public synchronized void sendOOBToPeers() { } } } + + public synchronized void stopWriters() { + for (Peer p : peers.keySet()) { + peersXceiver.get(p).stopWriter(); + } + } // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 44dfcd9d8f6..56bf5df467d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1062,6 +1062,10 @@ private void printHelp(String cmd) { + "\tclients will timeout and ignore the datanode. In such case, the\n" + "\tfast start-up mode will also be disabled.\n"; + String evictWriters = "-evictWriters \n" + + "\tMake the datanode evict all clients that are writing a block.\n" + + "\tThis is useful if decommissioning is hung due to slow writers.\n"; + String getDatanodeInfo = "-getDatanodeInfo \n" + "\tGet the information about the given datanode. This command can\n" + "\tbe used for checking if a datanode is alive.\n"; @@ -1131,6 +1135,8 @@ private void printHelp(String cmd) { System.out.println(disallowSnapshot); } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) { System.out.println(shutdownDatanode); + } else if ("evictWriters".equalsIgnoreCase(cmd)) { + System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); } else if ("help".equals(cmd)) { @@ -1165,6 +1171,7 @@ private void printHelp(String cmd) { System.out.println(allowSnapshot); System.out.println(disallowSnapshot); System.out.println(shutdownDatanode); + System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); System.out.println(help); @@ -2020,6 +2027,8 @@ public int run(String[] argv) throws Exception { exitCode = fetchImage(argv, i); } else if ("-shutdownDatanode".equals(cmd)) { exitCode = shutdownDatanode(argv, i); + } else if ("-evictWriters".equals(cmd)) { + exitCode = evictWriters(argv, i); } else if ("-getDatanodeInfo".equals(cmd)) { exitCode = getDatanodeInfo(argv, i); } else if ("-reconfig".equals(cmd)) { @@ -2144,6 +2153,18 @@ private int shutdownDatanode(String[] argv, int i) throws IOException { return 0; } + private int evictWriters(String[] argv, int i) throws IOException { + final String dn = argv[i]; + ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn); + try { + dnProxy.evictWriters(); + System.out.println("Requested writer eviction to datanode " + dn); + } catch (IOException ioe) { + return -1; + } + return 0; + } + private int getDatanodeInfo(String[] argv, int i) throws IOException { ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0eeb3b78edf..5e320fa29a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -271,6 +271,55 @@ public void testPipelineRecoveryOnOOB() throws Exception { } } + /** + * Test that the writer is kicked out of a node. + */ + @Test + public void testEvictWriter() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes((int)3) + .build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("testEvictWriter.dat"); + FSDataOutputStream out = fs.create(file, (short)2); + out.write(0x31); + out.hflush(); + + // get nodes in the pipeline + DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); + DatanodeInfo[] nodes = dfsOut.getPipeline(); + Assert.assertEquals(2, nodes.length); + String dnAddr = nodes[1].getIpcAddr(false); + + // evict the writer from the second datanode and wait until + // the pipeline is rebuilt. + DFSAdmin dfsadmin = new DFSAdmin(conf); + final String[] args1 = {"-evictWriters", dnAddr }; + Assert.assertEquals(0, dfsadmin.run(args1)); + out.write(0x31); + out.hflush(); + + // get the new pipline and check the node is not in there. + nodes = dfsOut.getPipeline(); + try { + Assert.assertTrue(nodes.length > 0 ); + for (int i = 0; i < nodes.length; i++) { + Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false))); + } + } finally { + out.close(); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** Test restart timeout */ @Test public void testPipelineRecoveryOnRestartFailure() throws Exception {