HDFS-3851. Merging change r1377372 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ced74bdc8
commit
357fbc1dd2
|
@ -61,8 +61,8 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
|
@ -120,9 +120,9 @@ import com.google.common.cache.RemovalNotification;
|
|||
@InterfaceAudience.Private
|
||||
public class DFSOutputStream extends FSOutputSummer
|
||||
implements Syncable, CanSetDropBehind {
|
||||
private final DFSClient dfsClient;
|
||||
private final long dfsclientSlowLogThresholdMs;
|
||||
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
||||
private final DFSClient dfsClient;
|
||||
private Socket s;
|
||||
// closed is accessed by different threads under different locks.
|
||||
private volatile boolean closed = false;
|
||||
|
@ -155,15 +155,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private final AtomicReference<CachingStrategy> cachingStrategy;
|
||||
private boolean failPacket = false;
|
||||
|
||||
private class Packet {
|
||||
final long seqno; // sequencenumber of buffer in block
|
||||
final long offsetInBlock; // offset in block
|
||||
private boolean lastPacketInBlock; // is this the last packet in block?
|
||||
boolean syncBlock; // this packet forces the current block to disk
|
||||
int numChunks; // number of chunks currently in packet
|
||||
final int maxChunks; // max chunks in packet
|
||||
|
||||
private static class Packet {
|
||||
private static final long HEART_BEAT_SEQNO = -1L;
|
||||
long seqno; // sequencenumber of buffer in block
|
||||
final long offsetInBlock; // offset in block
|
||||
boolean syncBlock; // this packet forces the current block to disk
|
||||
int numChunks; // number of chunks currently in packet
|
||||
final int maxChunks; // max chunks in packet
|
||||
final byte[] buf;
|
||||
private boolean lastPacketInBlock; // is this the last packet in block?
|
||||
|
||||
/**
|
||||
* buf is pointed into like follows:
|
||||
|
@ -181,45 +181,36 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
int checksumStart;
|
||||
int checksumPos;
|
||||
int dataStart;
|
||||
final int dataStart;
|
||||
int dataPos;
|
||||
|
||||
private static final long HEART_BEAT_SEQNO = -1L;
|
||||
|
||||
/**
|
||||
* Create a heartbeat packet.
|
||||
*/
|
||||
Packet() {
|
||||
this.lastPacketInBlock = false;
|
||||
this.numChunks = 0;
|
||||
this.offsetInBlock = 0;
|
||||
this.seqno = HEART_BEAT_SEQNO;
|
||||
|
||||
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
|
||||
|
||||
checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||
maxChunks = 0;
|
||||
Packet(int checksumSize) {
|
||||
this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new packet.
|
||||
*
|
||||
* @param pktSize maximum size of the packet, including checksum data and actual data.
|
||||
* @param pktSize maximum size of the packet,
|
||||
* including checksum data and actual data.
|
||||
* @param chunksPerPkt maximum number of chunks per packet.
|
||||
* @param offsetInBlock offset in bytes into the HDFS block.
|
||||
*/
|
||||
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
|
||||
Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
|
||||
long seqno, int checksumSize) {
|
||||
this.lastPacketInBlock = false;
|
||||
this.numChunks = 0;
|
||||
this.offsetInBlock = offsetInBlock;
|
||||
this.seqno = currentSeqno;
|
||||
currentSeqno++;
|
||||
this.seqno = seqno;
|
||||
|
||||
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
|
||||
|
||||
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||
checksumPos = checksumStart;
|
||||
dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
|
||||
dataStart = checksumStart + (chunksPerPkt * checksumSize);
|
||||
dataPos = dataStart;
|
||||
maxChunks = chunksPerPkt;
|
||||
}
|
||||
|
@ -476,6 +467,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
response.join();
|
||||
response = null;
|
||||
} catch (InterruptedException e) {
|
||||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -502,6 +494,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
try {
|
||||
dataQueue.wait(timeout);
|
||||
} catch (InterruptedException e) {
|
||||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
}
|
||||
doSleep = false;
|
||||
now = Time.now();
|
||||
|
@ -511,7 +504,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
// get packet to be sent.
|
||||
if (dataQueue.isEmpty()) {
|
||||
one = new Packet(); // heartbeat packet
|
||||
one = new Packet(checksum.getChecksumSize()); // heartbeat packet
|
||||
} else {
|
||||
one = dataQueue.getFirst(); // regular data packet
|
||||
}
|
||||
|
@ -551,6 +544,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// wait for acks to arrive from datanodes
|
||||
dataQueue.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -675,6 +669,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
response.close();
|
||||
response.join();
|
||||
} catch (InterruptedException e) {
|
||||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
} finally {
|
||||
response = null;
|
||||
}
|
||||
|
@ -1478,6 +1473,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
Thread.sleep(sleeptime);
|
||||
sleeptime *= 2;
|
||||
} catch (InterruptedException ie) {
|
||||
DFSClient.LOG.warn("Caught exception ", ie);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -1738,7 +1734,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
if (currentPacket == null) {
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock);
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
||||
currentPacket.seqno +
|
||||
|
@ -1785,7 +1781,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// indicate the end of block and reset bytesCurBlock.
|
||||
//
|
||||
if (bytesCurBlock == blockSize) {
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock);
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock,
|
||||
currentSeqno++, this.checksum.getChecksumSize());
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
waitAndQueueCurrentPacket();
|
||||
|
@ -1886,7 +1883,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// but sync was requested.
|
||||
// Send an empty packet
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock);
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
}
|
||||
} else {
|
||||
// We already flushed up to this offset.
|
||||
|
@ -1903,7 +1900,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// and sync was requested.
|
||||
// So send an empty sync packet.
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock);
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
} else {
|
||||
// just discard the current packet since it is already been sent.
|
||||
currentPacket = null;
|
||||
|
@ -2107,7 +2104,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
if (bytesCurBlock != 0) {
|
||||
// send an empty packet to mark the end of the block
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock);
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock,
|
||||
currentSeqno++, this.checksum.getChecksumSize());
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
}
|
||||
|
@ -2157,6 +2155,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
DFSClient.LOG.info("Could not complete " + src + " retrying...");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
DFSClient.LOG.warn("Caught exception ", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue