HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee

(cherry picked from commit aede8c10ec)
This commit is contained in:
Eric Payne 2016-04-06 20:20:14 +00:00
parent 387d3f25c1
commit caef78afe6
10 changed files with 170 additions and 8 deletions

View File

@ -140,6 +140,13 @@ HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
*/
void shutdownDatanode(boolean forUpgrade) throws IOException;
/**
* Evict clients that are writing to a datanode.
*
* @throws IOException
*/
void evictWriters() throws IOException;
/**
* Obtains datanode info
*

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@ -104,6 +105,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
private static final GetBalancerBandwidthRequestProto
VOID_GET_BALANCER_BANDWIDTH =
GetBalancerBandwidthRequestProto.newBuilder().build();
private final static EvictWritersRequestProto VOID_EVICT_WRITERS =
EvictWritersRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@ -286,6 +289,15 @@ public void shutdownDatanode(boolean forUpgrade) throws IOException {
}
}
@Override
public void evictWriters() throws IOException {
try {
rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public DatanodeLocalInfo getDatanodeInfo() throws IOException {
GetDatanodeInfoResponseProto response;

View File

@ -140,6 +140,13 @@ message ShutdownDatanodeRequestProto {
message ShutdownDatanodeResponseProto {
}
/** Tell datanode to evict active clients that are writing */
message EvictWritersRequestProto {
}
message EvictWritersResponseProto {
}
/**
* Ping datanode for liveness and quick info
*/
@ -209,6 +216,9 @@ service ClientDatanodeProtocolService {
rpc shutdownDatanode(ShutdownDatanodeRequestProto)
returns(ShutdownDatanodeResponseProto);
rpc evictWriters(EvictWritersRequestProto)
returns(EvictWritersResponseProto);
rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
returns(GetDatanodeInfoResponseProto);

View File

@ -29,6 +29,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@ -79,6 +81,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
StartReconfigurationResponseProto.newBuilder().build();
private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP =
TriggerBlockReportResponseProto.newBuilder().build();
private final static EvictWritersResponseProto EVICT_WRITERS_RESP =
EvictWritersResponseProto.newBuilder().build();
private final ClientDatanodeProtocol impl;
@ -186,6 +190,17 @@ public ShutdownDatanodeResponseProto shutdownDatanode(
return SHUTDOWN_DATANODE_RESP;
}
@Override
public EvictWritersResponseProto evictWriters(RpcController unused,
EvictWritersRequestProto request) throws ServiceException {
try {
impl.evictWriters();
} catch (IOException e) {
throw new ServiceException(e);
}
return EVICT_WRITERS_RESP;
}
public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused,
GetDatanodeInfoRequestProto request) throws ServiceException {
GetDatanodeInfoResponseProto res;

View File

@ -889,6 +889,9 @@ private void manageWriterOsCache(long offsetInBlock) {
}
public void sendOOB() throws IOException, InterruptedException {
if (isDatanode) {
return;
}
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}

View File

@ -2948,6 +2948,13 @@ public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException
shutdownThread.start();
}
@Override //ClientDatanodeProtocol
public void evictWriters() throws IOException {
checkSuperuserPrivilege();
LOG.info("Evicting all writers.");
xserver.stopWriters();
}
@Override //ClientDatanodeProtocol
public DatanodeLocalInfo getDatanodeInfo() {
long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000;

View File

@ -115,6 +115,7 @@ class DataXceiver extends Receiver implements Runnable {
private BlockReceiver blockReceiver = null;
private final int ioFileBufferSize;
private final int smallBufferSize;
private Thread xceiver = null;
/**
* Client Name used in previous operation. Not available on first request
@ -177,9 +178,38 @@ private OutputStream getOutputStream() {
}
public void sendOOB() throws IOException, InterruptedException {
BlockReceiver br = getCurrentBlockReceiver();
if (br == null) {
return;
}
// This doesn't need to be in a critical section. Althogh the client
// can resue the connection to issue a different request, trying sending
// an OOB through the recently closed block receiver is harmless.
LOG.info("Sending OOB to peer: " + peer);
if(blockReceiver!=null)
blockReceiver.sendOOB();
br.sendOOB();
}
public void stopWriter() {
// We want to interrupt the xceiver only when it is serving writes.
synchronized(this) {
if (getCurrentBlockReceiver() == null) {
return;
}
xceiver.interrupt();
}
LOG.info("Stopped the writer: " + peer);
}
/**
* blockReceiver is updated at multiple places. Use the synchronized setter
* and getter.
*/
private synchronized void setCurrentBlockReceiver(BlockReceiver br) {
blockReceiver = br;
}
private synchronized BlockReceiver getCurrentBlockReceiver() {
return blockReceiver;
}
/**
@ -191,6 +221,9 @@ public void run() {
Op op = null;
try {
synchronized(this) {
xceiver = Thread.currentThread();
}
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
@ -678,12 +711,12 @@ public void writeBlock(final ExtendedBlock block,
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = getBlockReceiver(block, storageType, in,
setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
cachingStrategy, allowLazyPersist, pinning));
replica = blockReceiver.getReplica();
} else {
replica = datanode.data.recoverClose(
@ -852,7 +885,7 @@ public void writeBlock(final ExtendedBlock block,
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
blockReceiver = null;
setCurrentBlockReceiver(null);
}
//update metrics
@ -1079,7 +1112,6 @@ public void replaceBlock(final ExtendedBlock block,
DataOutputStream proxyOut = null;
Status opStatus = SUCCESS;
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
boolean IoeDuringCopyBlockOperation = false;
try {
@ -1138,11 +1170,11 @@ public void replaceBlock(final ExtendedBlock block,
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = getBlockReceiver(block, storageType,
setCurrentBlockReceiver(getBlockReceiver(block, storageType,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind(), false, false);
CachingStrategy.newDropBehind(), false, false));
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,

View File

@ -256,6 +256,12 @@ public synchronized void sendOOBToPeers() {
}
}
}
public synchronized void stopWriters() {
for (Peer p : peers.keySet()) {
peersXceiver.get(p).stopWriter();
}
}
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should

View File

@ -1062,6 +1062,10 @@ private void printHelp(String cmd) {
+ "\tclients will timeout and ignore the datanode. In such case, the\n"
+ "\tfast start-up mode will also be disabled.\n";
String evictWriters = "-evictWriters <datanode_host:ipc_port>\n"
+ "\tMake the datanode evict all clients that are writing a block.\n"
+ "\tThis is useful if decommissioning is hung due to slow writers.\n";
String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n"
+ "\tGet the information about the given datanode. This command can\n"
+ "\tbe used for checking if a datanode is alive.\n";
@ -1131,6 +1135,8 @@ private void printHelp(String cmd) {
System.out.println(disallowSnapshot);
} else if ("shutdownDatanode".equalsIgnoreCase(cmd)) {
System.out.println(shutdownDatanode);
} else if ("evictWriters".equalsIgnoreCase(cmd)) {
System.out.println(evictWriters);
} else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
System.out.println(getDatanodeInfo);
} else if ("help".equals(cmd)) {
@ -1165,6 +1171,7 @@ private void printHelp(String cmd) {
System.out.println(allowSnapshot);
System.out.println(disallowSnapshot);
System.out.println(shutdownDatanode);
System.out.println(evictWriters);
System.out.println(getDatanodeInfo);
System.out.println(triggerBlockReport);
System.out.println(help);
@ -2020,6 +2027,8 @@ public int run(String[] argv) throws Exception {
exitCode = fetchImage(argv, i);
} else if ("-shutdownDatanode".equals(cmd)) {
exitCode = shutdownDatanode(argv, i);
} else if ("-evictWriters".equals(cmd)) {
exitCode = evictWriters(argv, i);
} else if ("-getDatanodeInfo".equals(cmd)) {
exitCode = getDatanodeInfo(argv, i);
} else if ("-reconfig".equals(cmd)) {
@ -2144,6 +2153,18 @@ private int shutdownDatanode(String[] argv, int i) throws IOException {
return 0;
}
private int evictWriters(String[] argv, int i) throws IOException {
final String dn = argv[i];
ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn);
try {
dnProxy.evictWriters();
System.out.println("Requested writer eviction to datanode " + dn);
} catch (IOException ioe) {
return -1;
}
return 0;
}
private int getDatanodeInfo(String[] argv, int i) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
try {

View File

@ -271,6 +271,55 @@ public void testPipelineRecoveryOnOOB() throws Exception {
}
}
/**
* Test that the writer is kicked out of a node.
*/
@Test
public void testEvictWriter() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes((int)3)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path file = new Path("testEvictWriter.dat");
FSDataOutputStream out = fs.create(file, (short)2);
out.write(0x31);
out.hflush();
// get nodes in the pipeline
DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
DatanodeInfo[] nodes = dfsOut.getPipeline();
Assert.assertEquals(2, nodes.length);
String dnAddr = nodes[1].getIpcAddr(false);
// evict the writer from the second datanode and wait until
// the pipeline is rebuilt.
DFSAdmin dfsadmin = new DFSAdmin(conf);
final String[] args1 = {"-evictWriters", dnAddr };
Assert.assertEquals(0, dfsadmin.run(args1));
out.write(0x31);
out.hflush();
// get the new pipline and check the node is not in there.
nodes = dfsOut.getPipeline();
try {
Assert.assertTrue(nodes.length > 0 );
for (int i = 0; i < nodes.length; i++) {
Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false)));
}
} finally {
out.close();
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/** Test restart timeout */
@Test
public void testPipelineRecoveryOnRestartFailure() throws Exception {