HDFS-5374. Remove deadcode in DFSOutputStream. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f79cbb9ba6
commit
631ccbdd20
@ -324,6 +324,8 @@ Release 2.3.0 - UNRELEASED
|
|||||||
HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands.
|
HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands.
|
||||||
(Binglin Chang via jing9)
|
(Binglin Chang via jing9)
|
||||||
|
|
||||||
|
HDFS-5374. Remove deadcode in DFSOutputStream. (suresh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||||
|
@ -47,7 +47,6 @@
|
|||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
@ -138,7 +137,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||||||
private long currentSeqno = 0;
|
private long currentSeqno = 0;
|
||||||
private long lastQueuedSeqno = -1;
|
private long lastQueuedSeqno = -1;
|
||||||
private long lastAckedSeqno = -1;
|
private long lastAckedSeqno = -1;
|
||||||
private long bytesCurBlock = 0; // bytes writen in current block
|
private long bytesCurBlock = 0; // bytes written in current block
|
||||||
private int packetSize = 0; // write packet size, not including the header.
|
private int packetSize = 0; // write packet size, not including the header.
|
||||||
private int chunksPerPacket = 0;
|
private int chunksPerPacket = 0;
|
||||||
private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
|
private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
|
||||||
@ -458,8 +457,7 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Packet one = null;
|
Packet one;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// process datanode IO errors if any
|
// process datanode IO errors if any
|
||||||
boolean doSleep = false;
|
boolean doSleep = false;
|
||||||
@ -504,7 +502,7 @@ public void run() {
|
|||||||
if(DFSClient.LOG.isDebugEnabled()) {
|
if(DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("Allocating new block");
|
DFSClient.LOG.debug("Allocating new block");
|
||||||
}
|
}
|
||||||
nodes = nextBlockOutputStream(src);
|
nodes = nextBlockOutputStream();
|
||||||
initDataStreaming();
|
initDataStreaming();
|
||||||
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
||||||
if(DFSClient.LOG.isDebugEnabled()) {
|
if(DFSClient.LOG.isDebugEnabled()) {
|
||||||
@ -569,9 +567,6 @@ public void run() {
|
|||||||
}
|
}
|
||||||
lastPacket = Time.now();
|
lastPacket = Time.now();
|
||||||
|
|
||||||
if (one.isHeartbeatPacket()) { //heartbeat packet
|
|
||||||
}
|
|
||||||
|
|
||||||
// update bytesSent
|
// update bytesSent
|
||||||
long tmpBytesSent = one.getLastByteOffsetBlock();
|
long tmpBytesSent = one.getLastByteOffsetBlock();
|
||||||
if (bytesSent < tmpBytesSent) {
|
if (bytesSent < tmpBytesSent) {
|
||||||
@ -690,7 +685,7 @@ private void closeStream() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Processes reponses from the datanodes. A packet is removed
|
// Processes responses from the datanodes. A packet is removed
|
||||||
// from the ackQueue when its response arrives.
|
// from the ackQueue when its response arrives.
|
||||||
//
|
//
|
||||||
private class ResponseProcessor extends Daemon {
|
private class ResponseProcessor extends Daemon {
|
||||||
@ -732,18 +727,18 @@ public void run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
assert seqno != PipelineAck.UNKOWN_SEQNO :
|
assert seqno != PipelineAck.UNKOWN_SEQNO :
|
||||||
"Ack for unkown seqno should be a failed ack: " + ack;
|
"Ack for unknown seqno should be a failed ack: " + ack;
|
||||||
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
|
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// a success ack for a data packet
|
// a success ack for a data packet
|
||||||
Packet one = null;
|
Packet one;
|
||||||
synchronized (dataQueue) {
|
synchronized (dataQueue) {
|
||||||
one = ackQueue.getFirst();
|
one = ackQueue.getFirst();
|
||||||
}
|
}
|
||||||
if (one.seqno != seqno) {
|
if (one.seqno != seqno) {
|
||||||
throw new IOException("Responseprocessor: Expecting seqno " +
|
throw new IOException("ResponseProcessor: Expecting seqno " +
|
||||||
" for block " + block +
|
" for block " + block +
|
||||||
one.seqno + " but received " + seqno);
|
one.seqno + " but received " + seqno);
|
||||||
}
|
}
|
||||||
@ -1052,7 +1047,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
|
|||||||
* Must get block ID and the IDs of the destinations from the namenode.
|
* Must get block ID and the IDs of the destinations from the namenode.
|
||||||
* Returns the list of target datanodes.
|
* Returns the list of target datanodes.
|
||||||
*/
|
*/
|
||||||
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
|
private DatanodeInfo[] nextBlockOutputStream() throws IOException {
|
||||||
LocatedBlock lb = null;
|
LocatedBlock lb = null;
|
||||||
DatanodeInfo[] nodes = null;
|
DatanodeInfo[] nodes = null;
|
||||||
int count = dfsClient.getConf().nBlockWriteRetry;
|
int count = dfsClient.getConf().nBlockWriteRetry;
|
||||||
@ -1210,8 +1205,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private LocatedBlock locateFollowingBlock(long start,
|
private LocatedBlock locateFollowingBlock(long start,
|
||||||
DatanodeInfo[] excludedNodes)
|
DatanodeInfo[] excludedNodes) throws IOException {
|
||||||
throws IOException, UnresolvedLinkException {
|
|
||||||
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
||||||
long sleeptime = 400;
|
long sleeptime = 400;
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -1283,7 +1277,7 @@ private void setLastException(IOException e) {
|
|||||||
* Create a socket for a write pipeline
|
* Create a socket for a write pipeline
|
||||||
* @param first the first datanode
|
* @param first the first datanode
|
||||||
* @param length the pipeline length
|
* @param length the pipeline length
|
||||||
* @param client
|
* @param client client
|
||||||
* @return the socket connected to the first datanode
|
* @return the socket connected to the first datanode
|
||||||
*/
|
*/
|
||||||
static Socket createSocketForPipeline(final DatanodeInfo first,
|
static Socket createSocketForPipeline(final DatanodeInfo first,
|
||||||
@ -1475,7 +1469,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
|
|||||||
//
|
//
|
||||||
// Rather than wait around for space in the queue, we should instead try to
|
// Rather than wait around for space in the queue, we should instead try to
|
||||||
// return to the caller as soon as possible, even though we slightly overrun
|
// return to the caller as soon as possible, even though we slightly overrun
|
||||||
// the MAX_PACKETS iength.
|
// the MAX_PACKETS length.
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1696,7 +1690,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If 1) any new blocks were allocated since the last flush, or 2) to
|
// If 1) any new blocks were allocated since the last flush, or 2) to
|
||||||
// update length in NN is requried, then persist block locations on
|
// update length in NN is required, then persist block locations on
|
||||||
// namenode.
|
// namenode.
|
||||||
if (persistBlocks.getAndSet(false) || updateLength) {
|
if (persistBlocks.getAndSet(false) || updateLength) {
|
||||||
try {
|
try {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user