From 497c65ad0cb1871af565937c8b14d174ec3c385c Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Wed, 6 Apr 2016 20:20:14 +0000 Subject: [PATCH] HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee (cherry picked from commit aede8c10ecad4f2a8802a834e4bd0b8286cebade) --- .../hdfs/protocol/ClientDatanodeProtocol.java | 7 +++ .../ClientDatanodeProtocolTranslatorPB.java | 12 +++++ .../main/proto/ClientDatanodeProtocol.proto | 10 ++++ ...atanodeProtocolServerSideTranslatorPB.java | 15 ++++++ .../hdfs/server/datanode/BlockReceiver.java | 3 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 +++ .../hdfs/server/datanode/DataXceiver.java | 48 +++++++++++++++--- .../server/datanode/DataXceiverServer.java | 6 +++ .../apache/hadoop/hdfs/tools/DFSAdmin.java | 21 ++++++++ ...TestClientProtocolForPipelineRecovery.java | 49 +++++++++++++++++++ 10 files changed, 170 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 5621de037d3..a60c17d101b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -140,6 +140,13 @@ public interface ClientDatanodeProtocol { */ void shutdownDatanode(boolean forUpgrade) throws IOException; + /** + * Evict clients that are writing to a datanode. + * + * @throws IOException + */ + void evictWriters() throws IOException; + /** * Obtains datanode info * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 31315c49da4..ec6b0494d4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -110,6 +111,8 @@ public class ClientDatanodeProtocolTranslatorPB implements private static final GetBalancerBandwidthRequestProto VOID_GET_BALANCER_BANDWIDTH = GetBalancerBandwidthRequestProto.newBuilder().build(); + private final static EvictWritersRequestProto VOID_EVICT_WRITERS = + EvictWritersRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -292,6 +295,15 @@ public class ClientDatanodeProtocolTranslatorPB implements } } + @Override + public void evictWriters() throws IOException { + try { + rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public DatanodeLocalInfo getDatanodeInfo() throws IOException { GetDatanodeInfoResponseProto response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 83f66d9f5cf..d0c83b5b7b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -139,6 +139,13 @@ message ShutdownDatanodeRequestProto { message ShutdownDatanodeResponseProto { } +/** Tell datanode to evict active clients that are writing */ +message EvictWritersRequestProto { +} + +message EvictWritersResponseProto { +} + /** * Ping datanode for liveness and quick info */ @@ -239,6 +246,9 @@ service ClientDatanodeProtocolService { rpc shutdownDatanode(ShutdownDatanodeRequestProto) returns(ShutdownDatanodeResponseProto); + rpc evictWriters(EvictWritersRequestProto) + returns(EvictWritersResponseProto); + rpc getDatanodeInfo(GetDatanodeInfoRequestProto) returns(GetDatanodeInfoResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index ff18f6d8442..fbf57972c1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -83,6 +85,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements StartReconfigurationResponseProto.newBuilder().build(); private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP = TriggerBlockReportResponseProto.newBuilder().build(); + private final static EvictWritersResponseProto EVICT_WRITERS_RESP = + EvictWritersResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -190,6 +194,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements return SHUTDOWN_DATANODE_RESP; } + @Override + public EvictWritersResponseProto evictWriters(RpcController unused, + EvictWritersRequestProto request) throws ServiceException { + try { + impl.evictWriters(); + } catch (IOException e) { + throw new ServiceException(e); + } + return EVICT_WRITERS_RESP; + } + public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused, GetDatanodeInfoRequestProto request) throws ServiceException { GetDatanodeInfoResponseProto res; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bbb90962102..a60d1886409 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -878,6 +878,9 @@ class BlockReceiver implements Closeable { } public void sendOOB() throws IOException, InterruptedException { + if (isDatanode) { + return; + } ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 7ee2971035a..56bec715300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2935,6 +2935,13 @@ public class DataNode extends ReconfigurableBase shutdownThread.start(); } + @Override //ClientDatanodeProtocol + public void evictWriters() throws IOException { + checkSuperuserPrivilege(); + LOG.info("Evicting all writers."); + xserver.stopWriters(); + } + @Override //ClientDatanodeProtocol public DatanodeLocalInfo getDatanodeInfo() { long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index c23e8a76b54..e5ae98f320c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -115,6 +115,7 @@ class DataXceiver extends Receiver implements Runnable { private BlockReceiver blockReceiver = null; private final int ioFileBufferSize; private final int smallBufferSize; + private Thread xceiver = null; /** * Client Name used in previous operation. Not available on first request @@ -177,9 +178,38 @@ class DataXceiver extends Receiver implements Runnable { } public void sendOOB() throws IOException, InterruptedException { + BlockReceiver br = getCurrentBlockReceiver(); + if (br == null) { + return; + } + // This doesn't need to be in a critical section. Althogh the client + // can resue the connection to issue a different request, trying sending + // an OOB through the recently closed block receiver is harmless. LOG.info("Sending OOB to peer: " + peer); - if(blockReceiver!=null) - blockReceiver.sendOOB(); + br.sendOOB(); + } + + public void stopWriter() { + // We want to interrupt the xceiver only when it is serving writes. + synchronized(this) { + if (getCurrentBlockReceiver() == null) { + return; + } + xceiver.interrupt(); + } + LOG.info("Stopped the writer: " + peer); + } + + /** + * blockReceiver is updated at multiple places. Use the synchronized setter + * and getter. + */ + private synchronized void setCurrentBlockReceiver(BlockReceiver br) { + blockReceiver = br; + } + + private synchronized BlockReceiver getCurrentBlockReceiver() { + return blockReceiver; } /** @@ -191,6 +221,9 @@ class DataXceiver extends Receiver implements Runnable { Op op = null; try { + synchronized(this) { + xceiver = Thread.currentThread(); + } dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; @@ -678,12 +711,12 @@ class DataXceiver extends Receiver implements Runnable { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = getBlockReceiver(block, storageType, in, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning); + cachingStrategy, allowLazyPersist, pinning)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose( @@ -852,7 +885,7 @@ class DataXceiver extends Receiver implements Runnable { IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); - blockReceiver = null; + setCurrentBlockReceiver(null); } //update metrics @@ -1079,7 +1112,6 @@ class DataXceiver extends Receiver implements Runnable { DataOutputStream proxyOut = null; Status opStatus = SUCCESS; String errMsg = null; - BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; boolean IoeDuringCopyBlockOperation = false; try { @@ -1138,11 +1170,11 @@ class DataXceiver extends Receiver implements Runnable { DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = getBlockReceiver(block, storageType, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false, false); + CachingStrategy.newDropBehind(), false, false)); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 8152e6ffb0e..78a07c7663c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -256,6 +256,12 @@ class DataXceiverServer implements Runnable { } } } + + public synchronized void stopWriters() { + for (Peer p : peers.keySet()) { + peersXceiver.get(p).stopWriter(); + } + } // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 8e8930460a0..96b75423835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1059,6 +1059,10 @@ public class DFSAdmin extends FsShell { + "\tclients will timeout and ignore the datanode. In such case, the\n" + "\tfast start-up mode will also be disabled.\n"; + String evictWriters = "-evictWriters \n" + + "\tMake the datanode evict all clients that are writing a block.\n" + + "\tThis is useful if decommissioning is hung due to slow writers.\n"; + String getDatanodeInfo = "-getDatanodeInfo \n" + "\tGet the information about the given datanode. This command can\n" + "\tbe used for checking if a datanode is alive.\n"; @@ -1128,6 +1132,8 @@ public class DFSAdmin extends FsShell { System.out.println(disallowSnapshot); } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) { System.out.println(shutdownDatanode); + } else if ("evictWriters".equalsIgnoreCase(cmd)) { + System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); } else if ("help".equals(cmd)) { @@ -1162,6 +1168,7 @@ public class DFSAdmin extends FsShell { System.out.println(allowSnapshot); System.out.println(disallowSnapshot); System.out.println(shutdownDatanode); + System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); System.out.println(help); @@ -1935,6 +1942,8 @@ public class DFSAdmin extends FsShell { exitCode = fetchImage(argv, i); } else if ("-shutdownDatanode".equals(cmd)) { exitCode = shutdownDatanode(argv, i); + } else if ("-evictWriters".equals(cmd)) { + exitCode = evictWriters(argv, i); } else if ("-getDatanodeInfo".equals(cmd)) { exitCode = getDatanodeInfo(argv, i); } else if ("-reconfig".equals(cmd)) { @@ -2042,6 +2051,18 @@ public class DFSAdmin extends FsShell { return 0; } + private int evictWriters(String[] argv, int i) throws IOException { + final String dn = argv[i]; + ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn); + try { + dnProxy.evictWriters(); + System.out.println("Requested writer eviction to datanode " + dn); + } catch (IOException ioe) { + return -1; + } + return 0; + } + private int getDatanodeInfo(String[] argv, int i) throws IOException { ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0eeb3b78edf..5e320fa29a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -271,6 +271,55 @@ public class TestClientProtocolForPipelineRecovery { } } + /** + * Test that the writer is kicked out of a node. + */ + @Test + public void testEvictWriter() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes((int)3) + .build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("testEvictWriter.dat"); + FSDataOutputStream out = fs.create(file, (short)2); + out.write(0x31); + out.hflush(); + + // get nodes in the pipeline + DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); + DatanodeInfo[] nodes = dfsOut.getPipeline(); + Assert.assertEquals(2, nodes.length); + String dnAddr = nodes[1].getIpcAddr(false); + + // evict the writer from the second datanode and wait until + // the pipeline is rebuilt. + DFSAdmin dfsadmin = new DFSAdmin(conf); + final String[] args1 = {"-evictWriters", dnAddr }; + Assert.assertEquals(0, dfsadmin.run(args1)); + out.write(0x31); + out.hflush(); + + // get the new pipline and check the node is not in there. + nodes = dfsOut.getPipeline(); + try { + Assert.assertTrue(nodes.length > 0 ); + for (int i = 0; i < nodes.length; i++) { + Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false))); + } + } finally { + out.close(); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** Test restart timeout */ @Test public void testPipelineRecoveryOnRestartFailure() throws Exception {