HDFS-4200. Reduce the size of synchronized sections in PacketResponder. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1413826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
68bc61ab5c
commit
7e56bfe405
|
@ -167,6 +167,9 @@ Trunk (Unreleased)
|
||||||
HDFS-4215. Remove locking from addToParent(..) since it is used in image
|
HDFS-4215. Remove locking from addToParent(..) since it is used in image
|
||||||
loading, and add INode.isFile(). (szetszwo)
|
loading, and add INode.isFile(). (szetszwo)
|
||||||
|
|
||||||
|
HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
|
||||||
|
(suresh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -638,10 +638,7 @@ class BlockReceiver implements Closeable {
|
||||||
responder.start(); // start thread to processes responses
|
responder.start(); // start thread to processes responses
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
while (receivePacket() >= 0) { /* Receive until the last packet */ }
|
||||||
* Receive until the last packet.
|
|
||||||
*/
|
|
||||||
while (receivePacket() >= 0) {}
|
|
||||||
|
|
||||||
// wait for all outstanding packet responses. And then
|
// wait for all outstanding packet responses. And then
|
||||||
// indicate responder to gracefully shutdown.
|
// indicate responder to gracefully shutdown.
|
||||||
|
@ -724,7 +721,7 @@ class BlockReceiver implements Closeable {
|
||||||
static private long checksum2long(byte[] checksum) {
|
static private long checksum2long(byte[] checksum) {
|
||||||
long crc = 0L;
|
long crc = 0L;
|
||||||
for(int i=0; i<checksum.length; i++) {
|
for(int i=0; i<checksum.length; i++) {
|
||||||
crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
|
crc |= (0xffL&checksum[i])<<((checksum.length-i-1)*8);
|
||||||
}
|
}
|
||||||
return crc;
|
return crc;
|
||||||
}
|
}
|
||||||
|
@ -783,24 +780,23 @@ class BlockReceiver implements Closeable {
|
||||||
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
|
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processed responses from downstream datanodes in the pipeline
|
* Processed responses from downstream datanodes in the pipeline
|
||||||
* and sends back replies to the originator.
|
* and sends back replies to the originator.
|
||||||
*/
|
*/
|
||||||
class PacketResponder implements Runnable, Closeable {
|
class PacketResponder implements Runnable, Closeable {
|
||||||
|
/** queue for packets waiting for ack - synchronization using monitor lock */
|
||||||
/** queue for packets waiting for ack */
|
|
||||||
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
||||||
/** the thread that spawns this responder */
|
/** the thread that spawns this responder */
|
||||||
private final Thread receiverThread = Thread.currentThread();
|
private final Thread receiverThread = Thread.currentThread();
|
||||||
/** is this responder running? */
|
/** is this responder running? - synchronization using monitor lock */
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
|
|
||||||
/** input from the next downstream datanode */
|
/** input from the next downstream datanode */
|
||||||
private final DataInputStream downstreamIn;
|
private final DataInputStream downstreamIn;
|
||||||
/** output to upstream datanode/client */
|
/** output to upstream datanode/client */
|
||||||
private final DataOutputStream upstreamOut;
|
private final DataOutputStream upstreamOut;
|
||||||
|
|
||||||
/** The type of this responder */
|
/** The type of this responder */
|
||||||
private final PacketResponderType type;
|
private final PacketResponderType type;
|
||||||
/** for log and error messages */
|
/** for log and error messages */
|
||||||
|
@ -812,8 +808,7 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
PacketResponder(final DataOutputStream upstreamOut,
|
PacketResponder(final DataOutputStream upstreamOut,
|
||||||
final DataInputStream downstreamIn,
|
final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
|
||||||
final DatanodeInfo[] downstreams) {
|
|
||||||
this.downstreamIn = downstreamIn;
|
this.downstreamIn = downstreamIn;
|
||||||
this.upstreamOut = upstreamOut;
|
this.upstreamOut = upstreamOut;
|
||||||
|
|
||||||
|
@ -830,31 +825,49 @@ class BlockReceiver implements Closeable {
|
||||||
this.myString = b.toString();
|
this.myString = b.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isRunning() {
|
||||||
|
return running && datanode.shouldRun;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* enqueue the seqno that is still be to acked by the downstream datanode.
|
* enqueue the seqno that is still be to acked by the downstream datanode.
|
||||||
* @param seqno
|
* @param seqno
|
||||||
* @param lastPacketInBlock
|
* @param lastPacketInBlock
|
||||||
* @param offsetInBlock
|
* @param offsetInBlock
|
||||||
*/
|
*/
|
||||||
synchronized void enqueue(final long seqno,
|
void enqueue(final long seqno, final boolean lastPacketInBlock,
|
||||||
final boolean lastPacketInBlock, final long offsetInBlock) {
|
final long offsetInBlock) {
|
||||||
if (running) {
|
|
||||||
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
|
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
|
||||||
System.nanoTime());
|
System.nanoTime());
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug(myString + ": enqueue " + p);
|
LOG.debug(myString + ": enqueue " + p);
|
||||||
}
|
}
|
||||||
|
synchronized(this) {
|
||||||
|
if (running) {
|
||||||
ackQueue.addLast(p);
|
ackQueue.addLast(p);
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
|
||||||
|
synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
|
||||||
|
while (isRunning() && ackQueue.size() == 0) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(myString + ": seqno=" + seqno +
|
||||||
|
" waiting for local datanode to finish write.");
|
||||||
|
}
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
return isRunning() ? ackQueue.getFirst() : null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* wait for all pending packets to be acked. Then shutdown thread.
|
* wait for all pending packets to be acked. Then shutdown thread.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() {
|
public synchronized void close() {
|
||||||
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
|
while (isRunning() && ackQueue.size() != 0) {
|
||||||
try {
|
try {
|
||||||
wait();
|
wait();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -877,8 +890,7 @@ class BlockReceiver implements Closeable {
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean lastPacketInBlock = false;
|
boolean lastPacketInBlock = false;
|
||||||
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
||||||
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
while (isRunning() && !lastPacketInBlock) {
|
||||||
|
|
||||||
long totalAckTimeNanos = 0;
|
long totalAckTimeNanos = 0;
|
||||||
boolean isInterrupted = false;
|
boolean isInterrupted = false;
|
||||||
try {
|
try {
|
||||||
|
@ -888,8 +900,7 @@ class BlockReceiver implements Closeable {
|
||||||
long seqno = PipelineAck.UNKOWN_SEQNO;
|
long seqno = PipelineAck.UNKOWN_SEQNO;
|
||||||
long ackRecvNanoTime = 0;
|
long ackRecvNanoTime = 0;
|
||||||
try {
|
try {
|
||||||
if (type != PacketResponderType.LAST_IN_PIPELINE
|
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
|
||||||
&& !mirrorError) {
|
|
||||||
// read an ack from downstream datanode
|
// read an ack from downstream datanode
|
||||||
ack.readFields(downstreamIn);
|
ack.readFields(downstreamIn);
|
||||||
ackRecvNanoTime = System.nanoTime();
|
ackRecvNanoTime = System.nanoTime();
|
||||||
|
@ -900,35 +911,30 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
if (seqno != PipelineAck.UNKOWN_SEQNO
|
if (seqno != PipelineAck.UNKOWN_SEQNO
|
||||||
|| type == PacketResponderType.LAST_IN_PIPELINE) {
|
|| type == PacketResponderType.LAST_IN_PIPELINE) {
|
||||||
synchronized (this) {
|
pkt = waitForAckHead(seqno);
|
||||||
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
if (!isRunning()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(myString + ": seqno=" + seqno +
|
|
||||||
" waiting for local datanode to finish write.");
|
|
||||||
}
|
|
||||||
wait();
|
|
||||||
}
|
|
||||||
if (!running || !datanode.shouldRun) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
pkt = ackQueue.getFirst();
|
|
||||||
expected = pkt.seqno;
|
expected = pkt.seqno;
|
||||||
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
|
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
|
||||||
&& seqno != expected) {
|
&& seqno != expected) {
|
||||||
throw new IOException(myString + "seqno: expected="
|
throw new IOException(myString + "seqno: expected=" + expected
|
||||||
+ expected + ", received=" + seqno);
|
+ ", received=" + seqno);
|
||||||
}
|
}
|
||||||
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
|
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
|
||||||
// The total ack time includes the ack times of downstream nodes.
|
// The total ack time includes the ack times of downstream
|
||||||
|
// nodes.
|
||||||
// The value is 0 if this responder doesn't have a downstream
|
// The value is 0 if this responder doesn't have a downstream
|
||||||
// DN in the pipeline.
|
// DN in the pipeline.
|
||||||
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
|
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
|
||||||
// Report the elapsed time from ack send to ack receive minus
|
// Report the elapsed time from ack send to ack receive minus
|
||||||
// the downstream ack time.
|
// the downstream ack time.
|
||||||
long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
|
long ackTimeNanos = totalAckTimeNanos
|
||||||
|
- ack.getDownstreamAckTimeNanos();
|
||||||
if (ackTimeNanos < 0) {
|
if (ackTimeNanos < 0) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
|
LOG.debug("Calculated invalid ack time: " + ackTimeNanos
|
||||||
|
+ "ns.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
|
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
|
||||||
|
@ -936,7 +942,6 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
lastPacketInBlock = pkt.lastPacketInBlock;
|
lastPacketInBlock = pkt.lastPacketInBlock;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} catch (InterruptedException ine) {
|
} catch (InterruptedException ine) {
|
||||||
isInterrupted = true;
|
isInterrupted = true;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -952,71 +957,29 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Thread.interrupted() || isInterrupted) {
|
if (Thread.interrupted() || isInterrupted) {
|
||||||
/* The receiver thread cancelled this thread.
|
/*
|
||||||
* We could also check any other status updates from the
|
* The receiver thread cancelled this thread. We could also check
|
||||||
* receiver thread (e.g. if it is ok to write to replyOut).
|
* any other status updates from the receiver thread (e.g. if it is
|
||||||
* It is prudent to not send any more status back to the client
|
* ok to write to replyOut). It is prudent to not send any more
|
||||||
* because this datanode has a problem. The upstream datanode
|
* status back to the client because this datanode has a problem.
|
||||||
* will detect that this datanode is bad, and rightly so.
|
* The upstream datanode will detect that this datanode is bad, and
|
||||||
|
* rightly so.
|
||||||
*/
|
*/
|
||||||
LOG.info(myString + ": Thread is interrupted.");
|
LOG.info(myString + ": Thread is interrupted.");
|
||||||
running = false;
|
running = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is the last packet in block, then close block
|
|
||||||
// file and finalize the block before responding success
|
|
||||||
if (lastPacketInBlock) {
|
if (lastPacketInBlock) {
|
||||||
BlockReceiver.this.close();
|
// Finalize the block and close the block file
|
||||||
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
finalizeBlock(startTime);
|
||||||
block.setNumBytes(replicaInfo.getNumBytes());
|
|
||||||
datanode.data.finalizeBlock(block);
|
|
||||||
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
|
|
||||||
if (ClientTraceLog.isInfoEnabled() && isClient) {
|
|
||||||
long offset = 0;
|
|
||||||
DatanodeRegistration dnR =
|
|
||||||
datanode.getDNRegistrationForBP(block.getBlockPoolId());
|
|
||||||
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
|
|
||||||
inAddr, myAddr, block.getNumBytes(),
|
|
||||||
"HDFS_WRITE", clientname, offset,
|
|
||||||
dnR.getStorageID(), block, endTime-startTime));
|
|
||||||
} else {
|
|
||||||
LOG.info("Received " + block + " size "
|
|
||||||
+ block.getNumBytes() + " from " + inAddr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// construct my ack message
|
sendAckUpstream(ack, expected, totalAckTimeNanos,
|
||||||
Status[] replies = null;
|
(pkt != null ? pkt.offsetInBlock : 0));
|
||||||
if (mirrorError) { // ack read error
|
|
||||||
replies = new Status[2];
|
|
||||||
replies[0] = Status.SUCCESS;
|
|
||||||
replies[1] = Status.ERROR;
|
|
||||||
} else {
|
|
||||||
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
|
|
||||||
: ack.getNumOfReplies();
|
|
||||||
replies = new Status[1+ackLen];
|
|
||||||
replies[0] = Status.SUCCESS;
|
|
||||||
for (int i=0; i<ackLen; i++) {
|
|
||||||
replies[i+1] = ack.getReply(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
|
|
||||||
|
|
||||||
if (replyAck.isSuccess() &&
|
|
||||||
pkt.offsetInBlock > replicaInfo.getBytesAcked())
|
|
||||||
replicaInfo.setBytesAcked(pkt.offsetInBlock);
|
|
||||||
|
|
||||||
// send my ack back to upstream datanode
|
|
||||||
replyAck.write(upstreamOut);
|
|
||||||
upstreamOut.flush();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(myString + ", replyAck=" + replyAck);
|
|
||||||
}
|
|
||||||
if (pkt != null) {
|
if (pkt != null) {
|
||||||
// remove the packet from the ack queue
|
// remove the packet from the ack queue
|
||||||
removeAckHead();
|
removeAckHead();
|
||||||
// update bytes acked
|
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
||||||
|
@ -1043,6 +1006,66 @@ class BlockReceiver implements Closeable {
|
||||||
LOG.info(myString + " terminating");
|
LOG.info(myString + " terminating");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finalize the block and close the block file
|
||||||
|
* @param startTime time when BlockReceiver started receiving the block
|
||||||
|
*/
|
||||||
|
private void finalizeBlock(long startTime) throws IOException {
|
||||||
|
BlockReceiver.this.close();
|
||||||
|
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
|
||||||
|
: 0;
|
||||||
|
block.setNumBytes(replicaInfo.getNumBytes());
|
||||||
|
datanode.data.finalizeBlock(block);
|
||||||
|
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
|
||||||
|
if (ClientTraceLog.isInfoEnabled() && isClient) {
|
||||||
|
long offset = 0;
|
||||||
|
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
|
||||||
|
.getBlockPoolId());
|
||||||
|
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
|
||||||
|
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
|
||||||
|
dnR.getStorageID(), block, endTime - startTime));
|
||||||
|
} else {
|
||||||
|
LOG.info("Received " + block + " size " + block.getNumBytes()
|
||||||
|
+ " from " + inAddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ack Ack received from downstream
|
||||||
|
* @param seqno sequence number of ack to be sent upstream
|
||||||
|
* @param totalAckTimeNanos total ack time including all the downstream
|
||||||
|
* nodes
|
||||||
|
* @param offsetInBlock offset in block for the data in packet
|
||||||
|
*/
|
||||||
|
private void sendAckUpstream(PipelineAck ack, long seqno,
|
||||||
|
long totalAckTimeNanos, long offsetInBlock) throws IOException {
|
||||||
|
Status[] replies = null;
|
||||||
|
if (mirrorError) { // ack read error
|
||||||
|
replies = MIRROR_ERROR_STATUS;
|
||||||
|
} else {
|
||||||
|
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
|
||||||
|
.getNumOfReplies();
|
||||||
|
replies = new Status[1 + ackLen];
|
||||||
|
replies[0] = Status.SUCCESS;
|
||||||
|
for (int i = 0; i < ackLen; i++) {
|
||||||
|
replies[i + 1] = ack.getReply(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PipelineAck replyAck = new PipelineAck(seqno, replies,
|
||||||
|
totalAckTimeNanos);
|
||||||
|
if (replyAck.isSuccess()
|
||||||
|
&& offsetInBlock > replicaInfo.getBytesAcked()) {
|
||||||
|
replicaInfo.setBytesAcked(offsetInBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
// send my ack back to upstream datanode
|
||||||
|
replyAck.write(upstreamOut);
|
||||||
|
upstreamOut.flush();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(myString + ", replyAck=" + replyAck);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a packet from the head of the ack queue
|
* Remove a packet from the head of the ack queue
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue