Backport HDFS-4200 from trunk: Reduce the size of synchronized sections in PacketResponder.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1573044 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-02-28 19:38:31 +00:00
parent 4d54763a07
commit b10d601a54
2 changed files with 198 additions and 172 deletions

View File

@ -120,6 +120,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6030. Remove an unused constructor in INode.java. (yzhang via HDFS-6030. Remove an unused constructor in INode.java. (yzhang via
cmccabe) cmccabe)
HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
(Suresh Srinivas, backported by Andrew Wang, committed by jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -693,10 +693,7 @@ class BlockReceiver implements Closeable {
responder.start(); // start thread to processes responses responder.start(); // start thread to processes responses
} }
/* while (receivePacket() >= 0) { /* Receive until the last packet */ }
* Receive until the last packet.
*/
while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then // wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown. // indicate responder to gracefully shutdown.
@ -785,7 +782,7 @@ class BlockReceiver implements Closeable {
static private long checksum2long(byte[] checksum) { static private long checksum2long(byte[] checksum) {
long crc = 0L; long crc = 0L;
for(int i=0; i<checksum.length; i++) { for(int i=0; i<checksum.length; i++) {
crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8); crc |= (0xffL&checksum[i])<<((checksum.length-i-1)*8);
} }
return crc; return crc;
} }
@ -844,24 +841,23 @@ class BlockReceiver implements Closeable {
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
} }
private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
/** /**
* Processes responses from downstream datanodes in the pipeline * Processes responses from downstream datanodes in the pipeline
* and sends back replies to the originator. * and sends back replies to the originator.
*/ */
class PacketResponder implements Runnable, Closeable { class PacketResponder implements Runnable, Closeable {
/** queue for packets waiting for ack - synchronization using monitor lock */
/** queue for packets waiting for ack */
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
/** the thread that spawns this responder */ /** the thread that spawns this responder */
private final Thread receiverThread = Thread.currentThread(); private final Thread receiverThread = Thread.currentThread();
/** is this responder running? */ /** is this responder running? - synchronization using monitor lock */
private volatile boolean running = true; private volatile boolean running = true;
/** input from the next downstream datanode */ /** input from the next downstream datanode */
private final DataInputStream downstreamIn; private final DataInputStream downstreamIn;
/** output to upstream datanode/client */ /** output to upstream datanode/client */
private final DataOutputStream upstreamOut; private final DataOutputStream upstreamOut;
/** The type of this responder */ /** The type of this responder */
private final PacketResponderType type; private final PacketResponderType type;
/** for log and error messages */ /** for log and error messages */
@ -873,8 +869,7 @@ class BlockReceiver implements Closeable {
} }
PacketResponder(final DataOutputStream upstreamOut, PacketResponder(final DataOutputStream upstreamOut,
final DataInputStream downstreamIn, final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
final DatanodeInfo[] downstreams) {
this.downstreamIn = downstreamIn; this.downstreamIn = downstreamIn;
this.upstreamOut = upstreamOut; this.upstreamOut = upstreamOut;
@ -891,24 +886,41 @@ class BlockReceiver implements Closeable {
this.myString = b.toString(); this.myString = b.toString();
} }
private boolean isRunning() {
return running && datanode.shouldRun;
}
/** /**
* enqueue the seqno that is still be to acked by the downstream datanode. * enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno * @param seqno
* @param lastPacketInBlock * @param lastPacketInBlock
* @param offsetInBlock * @param offsetInBlock
*/ */
synchronized void enqueue(final long seqno, void enqueue(final long seqno, final boolean lastPacketInBlock,
final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) { final long offsetInBlock, final Status ackStatus) {
if (running) { final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, System.nanoTime(), ackStatus);
System.nanoTime(), ackStatus); if(LOG.isDebugEnabled()) {
if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p);
LOG.debug(myString + ": enqueue " + p);
}
ackQueue.addLast(p);
notifyAll();
} }
synchronized(this) {
if (running) {
ackQueue.addLast(p);
notifyAll();
}
}
}
/** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
while (isRunning() && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
}
return isRunning() ? ackQueue.getFirst() : null;
} }
/** /**
@ -916,7 +928,7 @@ class BlockReceiver implements Closeable {
*/ */
@Override @Override
public synchronized void close() { public synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) { while (isRunning() && ackQueue.size() != 0) {
try { try {
wait(); wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -939,166 +951,98 @@ class BlockReceiver implements Closeable {
public void run() { public void run() {
boolean lastPacketInBlock = false; boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) { while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0; long totalAckTimeNanos = 0;
boolean isInterrupted = false; boolean isInterrupted = false;
try { try {
Packet pkt = null; Packet pkt = null;
long expected = -2; long expected = -2;
PipelineAck ack = new PipelineAck(); PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO; long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0; long ackRecvNanoTime = 0;
try { try {
if (type != PacketResponderType.LAST_IN_PIPELINE if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
&& !mirrorError) { // read an ack from downstream datanode
// read an ack from downstream datanode ack.readFields(downstreamIn);
ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime();
ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack);
LOG.debug(myString + " got " + ack);
}
seqno = ack.getSeqno();
} }
if (seqno != PipelineAck.UNKOWN_SEQNO seqno = ack.getSeqno();
|| type == PacketResponderType.LAST_IN_PIPELINE) { }
synchronized (this) { if (seqno != PipelineAck.UNKOWN_SEQNO
while (running && datanode.shouldRun && ackQueue.size() == 0) { || type == PacketResponderType.LAST_IN_PIPELINE) {
if (LOG.isDebugEnabled()) { pkt = waitForAckHead(seqno);
LOG.debug(myString + ": seqno=" + seqno + if (!isRunning()) {
" waiting for local datanode to finish write."); break;
} }
wait(); expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream
// nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos
- ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+ "ns.");
} }
if (!running || !datanode.shouldRun) { } else {
break; datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock;
} }
} }
} catch (InterruptedException ine) { lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true; isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
}
if (Thread.interrupted() || isInterrupted) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
inAddr, myAddr, block.getNumBytes(),
"HDFS_WRITE", clientname, offset,
dnR.getDatanodeUuid(), block, endTime-startTime));
} else {
LOG.info("Received " + block + " size "
+ block.getNumBytes() + " from " + inAddr);
}
}
Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus;
// construct my ack message
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
replies[0] = myStatus;
replies[1] = Status.ERROR;
} else { } else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0 // continue to run even if can not read from mirror
: ack.getNumOfReplies(); // notify client of the error
replies = new Status[1+ackLen]; // and wait for the client to shut down the pipeline
replies[0] = myStatus; mirrorError = true;
for (int i=0; i<ackLen; i++) { LOG.info(myString, ioe);
replies[i+1] = ack.getReply(i);
}
// If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of the mirror node.
// The mirror is guaranteed to be good without corrupt data.
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
running = false;
removeAckHead();
LOG.warn("Shutting down writer and responder due to a checksum error.");
receiverThread.interrupt();
continue;
}
} }
PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos); }
if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked())
replicaInfo.setBytesAcked(pkt.offsetInBlock);
// send my ack back to upstream datanode if (Thread.interrupted() || isInterrupted) {
replyAck.write(upstreamOut); /*
upstreamOut.flush(); * The receiver thread cancelled this thread. We could also check
if (LOG.isDebugEnabled()) { * any other status updates from the receiver thread (e.g. if it is
LOG.debug(myString + ", replyAck=" + replyAck); * ok to write to replyOut). It is prudent to not send any more
} * status back to the client because this datanode has a problem.
if (pkt != null) { * The upstream datanode will detect that this datanode is bad, and
// remove the packet from the ack queue * rightly so.
removeAckHead(); */
// update bytes acked LOG.info(myString + ": Thread is interrupted.");
} running = false;
// terminate after sending response if this node detected continue;
// a checksum error }
if (myStatus == Status.ERROR_CHECKSUM) {
running = false; if (lastPacketInBlock) {
LOG.warn("Shutting down writer and responder due to a checksum error."); // Finalize the block and close the block file
receiverThread.interrupt(); finalizeBlock(startTime);
continue; }
}
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e); LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) { if (running) {
@ -1124,6 +1068,85 @@ class BlockReceiver implements Closeable {
LOG.info(myString + " terminating"); LOG.info(myString + " terminating");
} }
/**
* Finalize the block and close the block file
* @param startTime time when BlockReceiver started receiving the block
*/
private void finalizeBlock(long startTime) throws IOException {
BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
dnR.getDatanodeUuid(), block, endTime - startTime));
} else {
LOG.info("Received " + block + " size " + block.getNumBytes()
+ " from " + inAddr);
}
}
/**
* @param ack Ack received from downstream
* @param seqno sequence number of ack to be sent upstream
* @param totalAckTimeNanos total ack time including all the downstream
* nodes
* @param offsetInBlock offset in block for the data in packet
*/
private void sendAckUpstream(PipelineAck ack, long seqno,
long totalAckTimeNanos, long offsetInBlock,
Status myStatus) throws IOException {
Status[] replies = null;
if (mirrorError) { // ack read error
replies = MIRROR_ERROR_STATUS;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
.getNumOfReplies();
replies = new Status[1 + ackLen];
replies[0] = myStatus;
for (int i = 0; i < ackLen; i++) {
replies[i + 1] = ack.getReply(i);
}
// If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of making the
// mirror node bad. The mirror is guaranteed to be good without
// corrupt data on disk.
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
throw new IOException("Shutting down writer and responder "
+ "since the down streams reported the data sent by this "
+ "thread is corrupt");
}
}
PipelineAck replyAck = new PipelineAck(seqno, replies,
totalAckTimeNanos);
if (replyAck.isSuccess()
&& offsetInBlock > replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(offsetInBlock);
}
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck);
}
// If a corruption was detected in the received data, terminate after
// sending ERROR_CHECKSUM back.
if (myStatus == Status.ERROR_CHECKSUM) {
throw new IOException("Shutting down writer and responder "
+ "due to a checksum error in received data. The error "
+ "response has been sent upstream.");
}
}
/** /**
* Remove a packet from the head of the ack queue * Remove a packet from the head of the ack queue
* *