HDFS-5375. Merge 1533258 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1533259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-10-17 20:48:39 +00:00
parent 2c1afcf3d0
commit 341272da8b
2 changed files with 14 additions and 18 deletions

View File

@ -83,6 +83,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands. HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands.
(Binglin Chang via jing9) (Binglin Chang via jing9)
HDFS-5374. Remove deadcode in DFSOutputStream. (suresh)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -47,7 +47,6 @@
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
@ -138,7 +137,7 @@ public class DFSOutputStream extends FSOutputSummer
private long currentSeqno = 0; private long currentSeqno = 0;
private long lastQueuedSeqno = -1; private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1; private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block private long bytesCurBlock = 0; // bytes written in current block
private int packetSize = 0; // write packet size, not including the header. private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0; private int chunksPerPacket = 0;
private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(); private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
@ -466,8 +465,7 @@ public void run() {
} }
} }
Packet one = null; Packet one;
try { try {
// process datanode IO errors if any // process datanode IO errors if any
boolean doSleep = false; boolean doSleep = false;
@ -511,7 +509,7 @@ public void run() {
if(DFSClient.LOG.isDebugEnabled()) { if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block"); DFSClient.LOG.debug("Allocating new block");
} }
nodes = nextBlockOutputStream(src); nodes = nextBlockOutputStream();
initDataStreaming(); initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(DFSClient.LOG.isDebugEnabled()) { if(DFSClient.LOG.isDebugEnabled()) {
@ -575,9 +573,6 @@ public void run() {
} }
lastPacket = Time.now(); lastPacket = Time.now();
if (one.isHeartbeatPacket()) { //heartbeat packet
}
// update bytesSent // update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock(); long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) { if (bytesSent < tmpBytesSent) {
@ -695,7 +690,7 @@ private void closeStream() {
} }
// //
// Processes reponses from the datanodes. A packet is removed // Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives. // from the ackQueue when its response arrives.
// //
private class ResponseProcessor extends Daemon { private class ResponseProcessor extends Daemon {
@ -737,18 +732,18 @@ public void run() {
} }
assert seqno != PipelineAck.UNKOWN_SEQNO : assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unkown seqno should be a failed ack: " + ack; "Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue; continue;
} }
// a success ack for a data packet // a success ack for a data packet
Packet one = null; Packet one;
synchronized (dataQueue) { synchronized (dataQueue) {
one = ackQueue.getFirst(); one = ackQueue.getFirst();
} }
if (one.seqno != seqno) { if (one.seqno != seqno) {
throw new IOException("Responseprocessor: Expecting seqno " + throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block + " for block " + block +
one.seqno + " but received " + seqno); one.seqno + " but received " + seqno);
} }
@ -1057,7 +1052,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
* Must get block ID and the IDs of the destinations from the namenode. * Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes. * Returns the list of target datanodes.
*/ */
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { private DatanodeInfo[] nextBlockOutputStream() throws IOException {
LocatedBlock lb = null; LocatedBlock lb = null;
DatanodeInfo[] nodes = null; DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry; int count = dfsClient.getConf().nBlockWriteRetry;
@ -1215,8 +1210,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
} }
private LocatedBlock locateFollowingBlock(long start, private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) DatanodeInfo[] excludedNodes) throws IOException {
throws IOException, UnresolvedLinkException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400; long sleeptime = 400;
while (true) { while (true) {
@ -1287,7 +1281,7 @@ private void setLastException(IOException e) {
* Create a socket for a write pipeline * Create a socket for a write pipeline
* @param first the first datanode * @param first the first datanode
* @param length the pipeline length * @param length the pipeline length
* @param client * @param client client
* @return the socket connected to the first datanode * @return the socket connected to the first datanode
*/ */
static Socket createSocketForPipeline(final DatanodeInfo first, static Socket createSocketForPipeline(final DatanodeInfo first,
@ -1479,7 +1473,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
// //
// Rather than wait around for space in the queue, we should instead try to // Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun // return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS iength. // the MAX_PACKETS length.
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; break;
} }
@ -1705,7 +1699,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
} }
} }
// If 1) any new blocks were allocated since the last flush, or 2) to // If 1) any new blocks were allocated since the last flush, or 2) to
// update length in NN is requried, then persist block locations on // update length in NN is required, then persist block locations on
// namenode. // namenode.
if (persistBlocks.getAndSet(false) || updateLength) { if (persistBlocks.getAndSet(false) || updateLength) {
try { try {