From aede8c10ecad4f2a8802a834e4bd0b8286cebade Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Wed, 6 Apr 2016 20:20:14 +0000 Subject: [PATCH] HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee --- .../hdfs/protocol/ClientDatanodeProtocol.java | 7 +++ .../ClientDatanodeProtocolTranslatorPB.java | 12 +++++ .../main/proto/ClientDatanodeProtocol.proto | 10 ++++ ...atanodeProtocolServerSideTranslatorPB.java | 15 ++++++ .../hdfs/server/datanode/BlockReceiver.java | 3 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 +++ .../hdfs/server/datanode/DataXceiver.java | 48 +++++++++++++++--- .../server/datanode/DataXceiverServer.java | 6 +++ .../apache/hadoop/hdfs/tools/DFSAdmin.java | 21 ++++++++ ...TestClientProtocolForPipelineRecovery.java | 49 +++++++++++++++++++ 10 files changed, 170 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 08547c1dc1b..e5413884535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -121,6 +121,13 @@ public interface ClientDatanodeProtocol { */ void shutdownDatanode(boolean forUpgrade) throws IOException; + /** + * Evict clients that are writing to a datanode. + * + * @throws IOException + */ + void evictWriters() throws IOException; + /** * Obtains datanode info * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 2fffffd9344..6aaa0257d86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -97,6 +98,8 @@ public class ClientDatanodeProtocolTranslatorPB implements private static final GetBalancerBandwidthRequestProto VOID_GET_BALANCER_BANDWIDTH = GetBalancerBandwidthRequestProto.newBuilder().build(); + private final static EvictWritersRequestProto VOID_EVICT_WRITERS = + EvictWritersRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -243,6 +246,15 @@ public class ClientDatanodeProtocolTranslatorPB implements } } + @Override + public void evictWriters() throws IOException { + try { + rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public DatanodeLocalInfo getDatanodeInfo() throws IOException { GetDatanodeInfoResponseProto response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 954fedc5e36..e135df84fb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -114,6 +114,13 @@ message ShutdownDatanodeRequestProto { message ShutdownDatanodeResponseProto { } +/** Tell datanode to evict active clients that are writing */ +message EvictWritersRequestProto { +} + +message EvictWritersResponseProto { +} + /** * Ping datanode for liveness and quick info */ @@ -176,6 +183,9 @@ service ClientDatanodeProtocolService { rpc shutdownDatanode(ShutdownDatanodeRequestProto) returns(ShutdownDatanodeResponseProto); + rpc evictWriters(EvictWritersRequestProto) + returns(EvictWritersResponseProto); + rpc getDatanodeInfo(GetDatanodeInfoRequestProto) returns(GetDatanodeInfoResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 0feecc142de..e0401f739f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -67,6 +69,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements StartReconfigurationResponseProto.newBuilder().build(); private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP = TriggerBlockReportResponseProto.newBuilder().build(); + private final static EvictWritersResponseProto EVICT_WRITERS_RESP = + EvictWritersResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -142,6 +146,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements return SHUTDOWN_DATANODE_RESP; } + @Override + public EvictWritersResponseProto evictWriters(RpcController unused, + EvictWritersRequestProto request) throws ServiceException { + try { + impl.evictWriters(); + } catch (IOException e) { + throw new ServiceException(e); + } + return EVICT_WRITERS_RESP; + } + public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused, GetDatanodeInfoRequestProto request) throws ServiceException { GetDatanodeInfoResponseProto res; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index fb0c1c5b1c3..8f9138ca6bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -889,6 +889,9 @@ class BlockReceiver implements Closeable { } public void sendOOB() throws IOException, InterruptedException { + if (isDatanode) { + return; + } ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 989afbed1da..625eb3f3b6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2973,6 +2973,13 @@ public class DataNode extends ReconfigurableBase shutdownThread.start(); } + @Override //ClientDatanodeProtocol + public void evictWriters() throws IOException { + checkSuperuserPrivilege(); + LOG.info("Evicting all writers."); + xserver.stopWriters(); + } + @Override //ClientDatanodeProtocol public DatanodeLocalInfo getDatanodeInfo() { long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 63bf5ae5363..d5dc3281f10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -116,6 +116,7 @@ class DataXceiver extends Receiver implements Runnable { private BlockReceiver blockReceiver = null; private final int ioFileBufferSize; private final int smallBufferSize; + private Thread xceiver = null; /** * Client Name used in previous operation. Not available on first request @@ -178,9 +179,38 @@ class DataXceiver extends Receiver implements Runnable { } public void sendOOB() throws IOException, InterruptedException { + BlockReceiver br = getCurrentBlockReceiver(); + if (br == null) { + return; + } + // This doesn't need to be in a critical section. Althogh the client + // can resue the connection to issue a different request, trying sending + // an OOB through the recently closed block receiver is harmless. LOG.info("Sending OOB to peer: " + peer); - if(blockReceiver!=null) - blockReceiver.sendOOB(); + br.sendOOB(); + } + + public void stopWriter() { + // We want to interrupt the xceiver only when it is serving writes. + synchronized(this) { + if (getCurrentBlockReceiver() == null) { + return; + } + xceiver.interrupt(); + } + LOG.info("Stopped the writer: " + peer); + } + + /** + * blockReceiver is updated at multiple places. Use the synchronized setter + * and getter. + */ + private synchronized void setCurrentBlockReceiver(BlockReceiver br) { + blockReceiver = br; + } + + private synchronized BlockReceiver getCurrentBlockReceiver() { + return blockReceiver; } /** @@ -192,6 +222,9 @@ class DataXceiver extends Receiver implements Runnable { Op op = null; try { + synchronized(this) { + xceiver = Thread.currentThread(); + } dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; @@ -679,12 +712,12 @@ class DataXceiver extends Receiver implements Runnable { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = getBlockReceiver(block, storageType, in, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning); + cachingStrategy, allowLazyPersist, pinning)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose( @@ -853,7 +886,7 @@ class DataXceiver extends Receiver implements Runnable { IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); - blockReceiver = null; + setCurrentBlockReceiver(null); } //update metrics @@ -1060,7 +1093,6 @@ class DataXceiver extends Receiver implements Runnable { DataOutputStream proxyOut = null; Status opStatus = SUCCESS; String errMsg = null; - BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; boolean IoeDuringCopyBlockOperation = false; try { @@ -1119,11 +1151,11 @@ class DataXceiver extends Receiver implements Runnable { DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = getBlockReceiver(block, storageType, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false, false); + CachingStrategy.newDropBehind(), false, false)); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 10945e71682..126d5b10d42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -256,6 +256,12 @@ class DataXceiverServer implements Runnable { } } } + + public synchronized void stopWriters() { + for (Peer p : peers.keySet()) { + peersXceiver.get(p).stopWriter(); + } + } // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index d84d66449df..a35246f38b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1090,6 +1090,10 @@ public class DFSAdmin extends FsShell { + "\tclients will timeout and ignore the datanode. In such case, the\n" + "\tfast start-up mode will also be disabled.\n"; + String evictWriters = "-evictWriters \n" + + "\tMake the datanode evict all clients that are writing a block.\n" + + "\tThis is useful if decommissioning is hung due to slow writers.\n"; + String getDatanodeInfo = "-getDatanodeInfo \n" + "\tGet the information about the given datanode. This command can\n" + "\tbe used for checking if a datanode is alive.\n"; @@ -1159,6 +1163,8 @@ public class DFSAdmin extends FsShell { System.out.println(disallowSnapshot); } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) { System.out.println(shutdownDatanode); + } else if ("evictWriters".equalsIgnoreCase(cmd)) { + System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); } else if ("help".equals(cmd)) { @@ -1193,6 +1199,7 @@ public class DFSAdmin extends FsShell { System.out.println(allowSnapshot); System.out.println(disallowSnapshot); System.out.println(shutdownDatanode); + System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); System.out.println(help); @@ -2047,6 +2054,8 @@ public class DFSAdmin extends FsShell { exitCode = fetchImage(argv, i); } else if ("-shutdownDatanode".equals(cmd)) { exitCode = shutdownDatanode(argv, i); + } else if ("-evictWriters".equals(cmd)) { + exitCode = evictWriters(argv, i); } else if ("-getDatanodeInfo".equals(cmd)) { exitCode = getDatanodeInfo(argv, i); } else if ("-reconfig".equals(cmd)) { @@ -2171,6 +2180,18 @@ public class DFSAdmin extends FsShell { return 0; } + private int evictWriters(String[] argv, int i) throws IOException { + final String dn = argv[i]; + ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn); + try { + dnProxy.evictWriters(); + System.out.println("Requested writer eviction to datanode " + dn); + } catch (IOException ioe) { + return -1; + } + return 0; + } + private int getDatanodeInfo(String[] argv, int i) throws IOException { ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0eeb3b78edf..5e320fa29a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -271,6 +271,55 @@ public class TestClientProtocolForPipelineRecovery { } } + /** + * Test that the writer is kicked out of a node. + */ + @Test + public void testEvictWriter() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes((int)3) + .build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("testEvictWriter.dat"); + FSDataOutputStream out = fs.create(file, (short)2); + out.write(0x31); + out.hflush(); + + // get nodes in the pipeline + DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); + DatanodeInfo[] nodes = dfsOut.getPipeline(); + Assert.assertEquals(2, nodes.length); + String dnAddr = nodes[1].getIpcAddr(false); + + // evict the writer from the second datanode and wait until + // the pipeline is rebuilt. + DFSAdmin dfsadmin = new DFSAdmin(conf); + final String[] args1 = {"-evictWriters", dnAddr }; + Assert.assertEquals(0, dfsadmin.run(args1)); + out.write(0x31); + out.hflush(); + + // get the new pipline and check the node is not in there. + nodes = dfsOut.getPipeline(); + try { + Assert.assertTrue(nodes.length > 0 ); + for (int i = 0; i < nodes.length; i++) { + Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false))); + } + } finally { + out.close(); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** Test restart timeout */ @Test public void testPipelineRecoveryOnRestartFailure() throws Exception {