HDFS-6569. Merging change r1618742 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1618747 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2014-08-18 21:26:45 +00:00
parent ab1c7bee12
commit d926fd12cf
6 changed files with 108 additions and 13 deletions

View File

@ -259,6 +259,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6825. Edit log corruption due to delayed block removal.
(Yongjun Zhang via wang)
HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
(brandonli)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -713,6 +713,11 @@ private void manageWriterOsCache(long offsetInBlock) {
}
}
public void sendOOB() throws IOException, InterruptedException {
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@ -801,8 +806,6 @@ void receiveBlock(
// Client will fall back to regular pipeline recovery.
}
try {
((PacketResponder) responder.getRunnable()).
sendOOBResponse(PipelineAck.getRestartOOBStatus());
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
@ -810,8 +813,6 @@ void receiveBlock(
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
} catch (IOException ioe) {
LOG.info("Error sending OOB Ack.", ioe);
}
}
responder.interrupt();

View File

@ -273,6 +273,7 @@ public static InetSocketAddress createSocketAddr(String target) {
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
DataXceiverServer xserver = null;
Daemon localDataXceiverServer = null;
ShortCircuitRegistry shortCircuitRegistry = null;
ThreadGroup threadGroup = null;
@ -656,8 +657,8 @@ private void initDataXceiver(Configuration conf) throws IOException {
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(tcpPeerServer, conf, this));
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@ -1144,6 +1145,11 @@ private void registerMXBean() {
dataNodeInfoBeanName = MBeans.register("DataNode", "DataNodeInfo", this);
}
@VisibleForTesting
public DataXceiverServer getXferServer() {
return xserver;
}
@VisibleForTesting
public int getXferPort() {
return streamingAddr.getPort();
@ -1402,6 +1408,7 @@ public void shutdown() {
// in order to avoid any further acceptance of requests, but the peers
// for block writes are not closed until the clients are notified.
if (dataXceiverServer != null) {
xserver.sendOOBToPeers();
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
}

View File

@ -103,6 +103,7 @@ class DataXceiver extends Receiver implements Runnable {
private long opStartTime; //the start time of receiving an Op
private final InputStream socketIn;
private OutputStream socketOut;
private BlockReceiver blockReceiver = null;
/**
* Client Name used in previous operation. Not available on first request
@ -159,6 +160,12 @@ private OutputStream getOutputStream() {
return socketOut;
}
public void sendOOB() throws IOException, InterruptedException {
LOG.info("Sending OOB to peer: " + peer);
if(blockReceiver!=null)
blockReceiver.sendOOB();
}
/**
* Read/write data from/to the DataXceiverServer.
*/
@ -168,7 +175,7 @@ public void run() {
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread());
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@ -584,7 +591,6 @@ public void writeBlock(final ExtendedBlock block,
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
@ -747,6 +753,7 @@ public void writeBlock(final ExtendedBlock block,
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
blockReceiver = null;
}
//update metrics

View File

@ -27,11 +27,11 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
/**
* Server used for receiving/sending a block of data.
@ -45,6 +45,7 @@ class DataXceiverServer implements Runnable {
private final PeerServer peerServer;
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
@ -217,18 +218,38 @@ void kill() {
}
}
synchronized void addPeer(Peer peer, Thread t) throws IOException {
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
peersXceiver.put(peer, xceiver);
}
synchronized void closePeer(Peer peer) {
peers.remove(peer);
peersXceiver.remove(peer);
IOUtils.cleanup(null, peer);
}
// Sending OOB to all peers
public synchronized void sendOOBToPeers() {
if (!datanode.shutdownForUpgrade) {
return;
}
for (Peer p : peers.keySet()) {
try {
peersXceiver.get(p).sendOOB();
} catch (IOException e) {
LOG.warn("Got error when sending OOB message.", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when sending OOB message.");
}
}
}
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should
// be set true before calling this method.
@ -247,6 +268,7 @@ synchronized void closeAllPeers() {
IOUtils.cleanup(LOG, p);
}
peers.clear();
peersXceiver.clear();
}
// Return the number of peers.
@ -254,7 +276,14 @@ synchronized int getNumPeers() {
return peers.size();
}
// Return the number of peers and DataXceivers.
@VisibleForTesting
synchronized int getNumPeersXceiver() {
return peersXceiver.size();
}
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
peersXceiver.remove(peer);
}
}

View File

@ -27,11 +27,14 @@
import java.io.File;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -67,6 +70,7 @@ public class TestDataNodeRollingUpgrade {
private void startCluster() throws IOException {
conf = new HdfsConfiguration();
conf.setInt("dfs.blocksize", 1024*1024);
cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -243,4 +247,48 @@ public void testDatanodeRollingUpgradeWithRollback() throws Exception {
shutdownCluster();
}
}
@Test (timeout=600000)
// Test DatanodeXceiver has correct peer-dataxceiver pairs for sending OOB message
public void testDatanodePeersXceiver() throws Exception {
try {
startCluster();
// Create files in DFS.
String testFile1 = "/TestDataNodeXceiver1.dat";
String testFile2 = "/TestDataNodeXceiver2.dat";
String testFile3 = "/TestDataNodeXceiver3.dat";
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
DFSClient client3 = new DFSClient(NameNode.getAddress(conf), conf);
DFSOutputStream s1 = (DFSOutputStream) client1.create(testFile1, true);
DFSOutputStream s2 = (DFSOutputStream) client2.create(testFile2, true);
DFSOutputStream s3 = (DFSOutputStream) client3.create(testFile3, true);
byte[] toWrite = new byte[1024*1024*8];
Random rb = new Random(1111);
rb.nextBytes(toWrite);
s1.write(toWrite, 0, 1024*1024*8);
s1.flush();
s2.write(toWrite, 0, 1024*1024*8);
s2.flush();
s3.write(toWrite, 0, 1024*1024*8);
s3.flush();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
.getNumPeersXceiver());
s1.close();
s2.close();
s3.close();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
.getNumPeersXceiver());
client1.close();
client2.close();
client3.close();
} finally {
shutdownCluster();
}
}
}