HDFS-3851. DFSOutputStream class code cleanup. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377372 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-08-26 04:00:26 +00:00
parent 56285e1bee
commit 735046ebec
2 changed files with 36 additions and 35 deletions

View File

@ -129,6 +129,8 @@ Trunk (unreleased changes)
HDFS-3844. Add @Override and remove {@inheritdoc} and unnecessary HDFS-3844. Add @Override and remove {@inheritdoc} and unnecessary
imports. (Jing Zhao via suresh) imports. (Jing Zhao via suresh)
HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@ -107,8 +107,8 @@ import com.google.common.annotations.VisibleForTesting;
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer implements Syncable { public class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private final DFSClient dfsClient;
private Socket s; private Socket s;
// closed is accessed by different threads under different locks. // closed is accessed by different threads under different locks.
private volatile boolean closed = false; private volatile boolean closed = false;
@ -138,15 +138,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close private boolean shouldSyncBlock = false; // force blocks to disk upon close
private class Packet { private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
long seqno; // sequencenumber of buffer in block long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block final long offsetInBlock; // offset in block
private boolean lastPacketInBlock; // is this the last packet in block?
boolean syncBlock; // this packet forces the current block to disk boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet final int maxChunks; // max chunks in packet
byte[] buf; byte[] buf;
private boolean lastPacketInBlock; // is this the last packet in block?
/** /**
* buf is pointed into like follows: * buf is pointed into like follows:
@ -164,45 +164,36 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
*/ */
int checksumStart; int checksumStart;
int checksumPos; int checksumPos;
int dataStart; final int dataStart;
int dataPos; int dataPos;
private static final long HEART_BEAT_SEQNO = -1L;
/** /**
* Create a heartbeat packet. * Create a heartbeat packet.
*/ */
Packet() { Packet(int checksumSize) {
this.lastPacketInBlock = false; this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
this.numChunks = 0;
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
maxChunks = 0;
} }
/** /**
* Create a new packet. * Create a new packet.
* *
* @param pktSize maximum size of the packet, including checksum data and actual data. * @param pktSize maximum size of the packet,
* including checksum data and actual data.
* @param chunksPerPkt maximum number of chunks per packet. * @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block. * @param offsetInBlock offset in bytes into the HDFS block.
*/ */
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
long seqno, int checksumSize) {
this.lastPacketInBlock = false; this.lastPacketInBlock = false;
this.numChunks = 0; this.numChunks = 0;
this.offsetInBlock = offsetInBlock; this.offsetInBlock = offsetInBlock;
this.seqno = currentSeqno; this.seqno = seqno;
currentSeqno++;
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart; checksumPos = checksumStart;
dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize()); dataStart = checksumStart + (chunksPerPkt * checksumSize);
dataPos = dataStart; dataPos = dataStart;
maxChunks = chunksPerPkt; maxChunks = chunksPerPkt;
} }
@ -412,6 +403,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
response.join(); response.join();
response = null; response = null;
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
} }
} }
@ -439,6 +431,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
try { try {
dataQueue.wait(timeout); dataQueue.wait(timeout);
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
} }
doSleep = false; doSleep = false;
now = Time.now(); now = Time.now();
@ -448,7 +441,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
} }
// get packet to be sent. // get packet to be sent.
if (dataQueue.isEmpty()) { if (dataQueue.isEmpty()) {
one = new Packet(); // heartbeat packet one = new Packet(checksum.getChecksumSize()); // heartbeat packet
} else { } else {
one = dataQueue.getFirst(); // regular data packet one = dataQueue.getFirst(); // regular data packet
} }
@ -488,6 +481,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// wait for acks to arrive from datanodes // wait for acks to arrive from datanodes
dataQueue.wait(1000); dataQueue.wait(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
} }
} }
} }
@ -607,6 +601,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
response.close(); response.close();
response.join(); response.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
} finally { } finally {
response = null; response = null;
} }
@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
Thread.sleep(sleeptime); Thread.sleep(sleeptime);
sleeptime *= 2; sleeptime *= 2;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie);
} }
} }
} else { } else {
@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
if (currentPacket == null) { if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock); bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno + currentPacket.seqno +
@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// indicate the end of block and reset bytesCurBlock. // indicate the end of block and reset bytesCurBlock.
// //
if (bytesCurBlock == blockSize) { if (bytesCurBlock == blockSize) {
currentPacket = new Packet(0, 0, bytesCurBlock); currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket(); waitAndQueueCurrentPacket();
@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// but sync was requested. // but sync was requested.
// Send an empty packet // Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock); bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
} }
} else { } else {
// We already flushed up to this offset. // We already flushed up to this offset.
@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// and sync was requested. // and sync was requested.
// So send an empty sync packet. // So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock); bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
} else { } else {
// just discard the current packet since it is already been sent. // just discard the current packet since it is already been sent.
currentPacket = null; currentPacket = null;
@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
if (bytesCurBlock != 0) { if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
currentPacket = new Packet(0, 0, bytesCurBlock); currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
} }
@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
DFSClient.LOG.info("Could not complete file " + src + " retrying..."); DFSClient.LOG.info("Could not complete file " + src + " retrying...");
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie);
} }
} }
} }