diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0853d368391..917d3ca7e68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -167,6 +167,9 @@ Trunk (Unreleased) HDFS-4215. Remove locking from addToParent(..) since it is used in image loading, and add INode.isFile(). (szetszwo) + HDFS-4200. Reduce the size of synchronized sections in PacketResponder. + (suresh) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 6995fd2d4ed..cc32224b70f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -638,10 +638,7 @@ class BlockReceiver implements Closeable { responder.start(); // start thread to processes responses } - /* - * Receive until the last packet. - */ - while (receivePacket() >= 0) {} + while (receivePacket() >= 0) { /* Receive until the last packet */ } // wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. @@ -724,7 +721,7 @@ class BlockReceiver implements Closeable { static private long checksum2long(byte[] checksum) { long crc = 0L; for(int i=0; i ackQueue = new LinkedList(); /** the thread that spawns this responder */ private final Thread receiverThread = Thread.currentThread(); - /** is this responder running? */ + /** is this responder running? - synchronization using monitor lock */ private volatile boolean running = true; - /** input from the next downstream datanode */ private final DataInputStream downstreamIn; /** output to upstream datanode/client */ private final DataOutputStream upstreamOut; - /** The type of this responder */ private final PacketResponderType type; /** for log and error messages */ @@ -812,8 +808,7 @@ class BlockReceiver implements Closeable { } PacketResponder(final DataOutputStream upstreamOut, - final DataInputStream downstreamIn, - final DatanodeInfo[] downstreams) { + final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) { this.downstreamIn = downstreamIn; this.upstreamOut = upstreamOut; @@ -830,23 +825,41 @@ class BlockReceiver implements Closeable { this.myString = b.toString(); } + private boolean isRunning() { + return running && datanode.shouldRun; + } + /** * enqueue the seqno that is still be to acked by the downstream datanode. * @param seqno * @param lastPacketInBlock * @param offsetInBlock */ - synchronized void enqueue(final long seqno, - final boolean lastPacketInBlock, final long offsetInBlock) { - if (running) { - final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, - System.nanoTime()); - if(LOG.isDebugEnabled()) { - LOG.debug(myString + ": enqueue " + p); - } - ackQueue.addLast(p); - notifyAll(); + void enqueue(final long seqno, final boolean lastPacketInBlock, + final long offsetInBlock) { + final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, + System.nanoTime()); + if(LOG.isDebugEnabled()) { + LOG.debug(myString + ": enqueue " + p); } + synchronized(this) { + if (running) { + ackQueue.addLast(p); + notifyAll(); + } + } + } + + /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */ + synchronized Packet waitForAckHead(long seqno) throws InterruptedException { + while (isRunning() && ackQueue.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(myString + ": seqno=" + seqno + + " waiting for local datanode to finish write."); + } + wait(); + } + return isRunning() ? ackQueue.getFirst() : null; } /** @@ -854,7 +867,7 @@ class BlockReceiver implements Closeable { */ @Override public synchronized void close() { - while (running && ackQueue.size() != 0 && datanode.shouldRun) { + while (isRunning() && ackQueue.size() != 0) { try { wait(); } catch (InterruptedException e) { @@ -877,147 +890,97 @@ class BlockReceiver implements Closeable { public void run() { boolean lastPacketInBlock = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; - while (running && datanode.shouldRun && !lastPacketInBlock) { - + while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0; boolean isInterrupted = false; try { - Packet pkt = null; - long expected = -2; - PipelineAck ack = new PipelineAck(); - long seqno = PipelineAck.UNKOWN_SEQNO; - long ackRecvNanoTime = 0; - try { - if (type != PacketResponderType.LAST_IN_PIPELINE - && !mirrorError) { - // read an ack from downstream datanode - ack.readFields(downstreamIn); - ackRecvNanoTime = System.nanoTime(); - if (LOG.isDebugEnabled()) { - LOG.debug(myString + " got " + ack); - } - seqno = ack.getSeqno(); + Packet pkt = null; + long expected = -2; + PipelineAck ack = new PipelineAck(); + long seqno = PipelineAck.UNKOWN_SEQNO; + long ackRecvNanoTime = 0; + try { + if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { + // read an ack from downstream datanode + ack.readFields(downstreamIn); + ackRecvNanoTime = System.nanoTime(); + if (LOG.isDebugEnabled()) { + LOG.debug(myString + " got " + ack); } - if (seqno != PipelineAck.UNKOWN_SEQNO - || type == PacketResponderType.LAST_IN_PIPELINE) { - synchronized (this) { - while (running && datanode.shouldRun && ackQueue.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(myString + ": seqno=" + seqno + - " waiting for local datanode to finish write."); - } - wait(); + seqno = ack.getSeqno(); + } + if (seqno != PipelineAck.UNKOWN_SEQNO + || type == PacketResponderType.LAST_IN_PIPELINE) { + pkt = waitForAckHead(seqno); + if (!isRunning()) { + break; + } + expected = pkt.seqno; + if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE + && seqno != expected) { + throw new IOException(myString + "seqno: expected=" + expected + + ", received=" + seqno); + } + if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { + // The total ack time includes the ack times of downstream + // nodes. + // The value is 0 if this responder doesn't have a downstream + // DN in the pipeline. + totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; + // Report the elapsed time from ack send to ack receive minus + // the downstream ack time. + long ackTimeNanos = totalAckTimeNanos + - ack.getDownstreamAckTimeNanos(); + if (ackTimeNanos < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Calculated invalid ack time: " + ackTimeNanos + + "ns."); } - if (!running || !datanode.shouldRun) { - break; - } - pkt = ackQueue.getFirst(); - expected = pkt.seqno; - if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE - && seqno != expected) { - throw new IOException(myString + "seqno: expected=" - + expected + ", received=" + seqno); - } - if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { - // The total ack time includes the ack times of downstream nodes. - // The value is 0 if this responder doesn't have a downstream - // DN in the pipeline. - totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; - // Report the elapsed time from ack send to ack receive minus - // the downstream ack time. - long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); - if (ackTimeNanos < 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns."); - } - } else { - datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); - } - } - lastPacketInBlock = pkt.lastPacketInBlock; + } else { + datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); } } - } catch (InterruptedException ine) { + lastPacketInBlock = pkt.lastPacketInBlock; + } + } catch (InterruptedException ine) { + isInterrupted = true; + } catch (IOException ioe) { + if (Thread.interrupted()) { isInterrupted = true; - } catch (IOException ioe) { - if (Thread.interrupted()) { - isInterrupted = true; - } else { - // continue to run even if can not read from mirror - // notify client of the error - // and wait for the client to shut down the pipeline - mirrorError = true; - LOG.info(myString, ioe); - } - } - - if (Thread.interrupted() || isInterrupted) { - /* The receiver thread cancelled this thread. - * We could also check any other status updates from the - * receiver thread (e.g. if it is ok to write to replyOut). - * It is prudent to not send any more status back to the client - * because this datanode has a problem. The upstream datanode - * will detect that this datanode is bad, and rightly so. - */ - LOG.info(myString + ": Thread is interrupted."); - running = false; - continue; - } - - // If this is the last packet in block, then close block - // file and finalize the block before responding success - if (lastPacketInBlock) { - BlockReceiver.this.close(); - final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; - block.setNumBytes(replicaInfo.getNumBytes()); - datanode.data.finalizeBlock(block); - datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); - if (ClientTraceLog.isInfoEnabled() && isClient) { - long offset = 0; - DatanodeRegistration dnR = - datanode.getDNRegistrationForBP(block.getBlockPoolId()); - ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, - inAddr, myAddr, block.getNumBytes(), - "HDFS_WRITE", clientname, offset, - dnR.getStorageID(), block, endTime-startTime)); - } else { - LOG.info("Received " + block + " size " - + block.getNumBytes() + " from " + inAddr); - } - } - - // construct my ack message - Status[] replies = null; - if (mirrorError) { // ack read error - replies = new Status[2]; - replies[0] = Status.SUCCESS; - replies[1] = Status.ERROR; } else { - short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0 - : ack.getNumOfReplies(); - replies = new Status[1+ackLen]; - replies[0] = Status.SUCCESS; - for (int i=0; i replicaInfo.getBytesAcked()) - replicaInfo.setBytesAcked(pkt.offsetInBlock); + } - // send my ack back to upstream datanode - replyAck.write(upstreamOut); - upstreamOut.flush(); - if (LOG.isDebugEnabled()) { - LOG.debug(myString + ", replyAck=" + replyAck); - } - if (pkt != null) { - // remove the packet from the ack queue - removeAckHead(); - // update bytes acked - } + if (Thread.interrupted() || isInterrupted) { + /* + * The receiver thread cancelled this thread. We could also check + * any other status updates from the receiver thread (e.g. if it is + * ok to write to replyOut). It is prudent to not send any more + * status back to the client because this datanode has a problem. + * The upstream datanode will detect that this datanode is bad, and + * rightly so. + */ + LOG.info(myString + ": Thread is interrupted."); + running = false; + continue; + } + + if (lastPacketInBlock) { + // Finalize the block and close the block file + finalizeBlock(startTime); + } + + sendAckUpstream(ack, expected, totalAckTimeNanos, + (pkt != null ? pkt.offsetInBlock : 0)); + if (pkt != null) { + // remove the packet from the ack queue + removeAckHead(); + } } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { @@ -1043,6 +1006,66 @@ class BlockReceiver implements Closeable { LOG.info(myString + " terminating"); } + /** + * Finalize the block and close the block file + * @param startTime time when BlockReceiver started receiving the block + */ + private void finalizeBlock(long startTime) throws IOException { + BlockReceiver.this.close(); + final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() + : 0; + block.setNumBytes(replicaInfo.getNumBytes()); + datanode.data.finalizeBlock(block); + datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); + if (ClientTraceLog.isInfoEnabled() && isClient) { + long offset = 0; + DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block + .getBlockPoolId()); + ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr, + myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset, + dnR.getStorageID(), block, endTime - startTime)); + } else { + LOG.info("Received " + block + " size " + block.getNumBytes() + + " from " + inAddr); + } + } + + /** + * @param ack Ack received from downstream + * @param seqno sequence number of ack to be sent upstream + * @param totalAckTimeNanos total ack time including all the downstream + * nodes + * @param offsetInBlock offset in block for the data in packet + */ + private void sendAckUpstream(PipelineAck ack, long seqno, + long totalAckTimeNanos, long offsetInBlock) throws IOException { + Status[] replies = null; + if (mirrorError) { // ack read error + replies = MIRROR_ERROR_STATUS; + } else { + short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack + .getNumOfReplies(); + replies = new Status[1 + ackLen]; + replies[0] = Status.SUCCESS; + for (int i = 0; i < ackLen; i++) { + replies[i + 1] = ack.getReply(i); + } + } + PipelineAck replyAck = new PipelineAck(seqno, replies, + totalAckTimeNanos); + if (replyAck.isSuccess() + && offsetInBlock > replicaInfo.getBytesAcked()) { + replicaInfo.setBytesAcked(offsetInBlock); + } + + // send my ack back to upstream datanode + replyAck.write(upstreamOut); + upstreamOut.flush(); + if (LOG.isDebugEnabled()) { + LOG.debug(myString + ", replyAck=" + replyAck); + } + } + /** * Remove a packet from the head of the ack queue *