HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of subclassing. Contributed by Li Bo

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-02 10:59:26 -07:00
parent beb0fd0d60
commit ce3f32590d
3 changed files with 67 additions and 61 deletions

View File

@ -61,6 +61,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..). HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
(Walter Su via wang) (Walter Su via wang)
HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of
subclassing. (Li Bo via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.Socket;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer
static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
CryptoProtocolVersion.supported(); CryptoProtocolVersion.supported();
private final DFSClient dfsClient; protected final DFSClient dfsClient;
private final ByteArrayManager byteArrayManager; protected final ByteArrayManager byteArrayManager;
// closed is accessed by different threads under different locks. // closed is accessed by different threads under different locks.
private volatile boolean closed = false; protected volatile boolean closed = false;
private final String src; protected final String src;
private final long fileId; protected final long fileId;
private final long blockSize; protected final long blockSize;
private final int bytesPerChecksum; protected final int bytesPerChecksum;
private DFSPacket currentPacket = null; protected DFSPacket currentPacket = null;
private DataStreamer streamer; protected DataStreamer streamer;
private int packetSize = 0; // write packet size, not including the header. protected int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0; protected int chunksPerPacket = 0;
private long lastFlushOffset = 0; // offset when flush was invoked protected long lastFlushOffset = 0; // offset when flush was invoked
private long initialFileSize = 0; // at time of file open private long initialFileSize = 0; // at time of file open
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close protected boolean shouldSyncBlock = false; // force blocks to disk upon close
private final AtomicReference<CachingStrategy> cachingStrategy; protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo; private FileEncryptionInfo fileEncryptionInfo;
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
long seqno, boolean lastPacketInBlock) throws InterruptedIOException { long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
final byte[] buf; final byte[] buf;
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
@ -206,7 +205,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress, EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException { DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
@ -359,7 +358,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
private void computePacketChunkSize(int psize, int csize) { protected void computePacketChunkSize(int psize, int csize) {
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
final int chunkSize = csize + getChecksumSize(); final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1); chunksPerPacket = Math.max(bodySize/chunkSize, 1);
@ -426,9 +425,18 @@ public class DFSOutputStream extends FSOutputSummer
streamer.waitAndQueuePacket(currentPacket); streamer.waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
// If the reopened file did not end at chunk boundary and the above adjustChunkBoundary();
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on. endBlock();
}
}
/**
* If the reopened file did not end at chunk boundary and the above
* write filled up its partial chunk. Tell the summer to generate full
* crc chunks from now on.
*/
protected void adjustChunkBoundary() {
if (streamer.getAppendChunk() && if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) { streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false); streamer.setAppendChunk(false);
@ -440,10 +448,15 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.getConf().writePacketSize); dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum); computePacketChunkSize(psize, bytesPerChecksum);
} }
// }
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock. /**
// * if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) { if (streamer.getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true); streamer.getAndIncCurrentSeqno(), true);
@ -454,7 +467,6 @@ public class DFSOutputStream extends FSOutputSummer
lastFlushOffset = 0; lastFlushOffset = 0;
} }
} }
}
@Deprecated @Deprecated
public void sync() throws IOException { public void sync() throws IOException {
@ -681,7 +693,7 @@ public class DFSOutputStream extends FSOutputSummer
* Waits till all existing data is flushed and confirmations * Waits till all existing data is flushed and confirmations
* received from datanodes. * received from datanodes.
*/ */
private void flushInternal() throws IOException { protected void flushInternal() throws IOException {
long toWaitFor; long toWaitFor;
synchronized (this) { synchronized (this) {
dfsClient.checkOpen(); dfsClient.checkOpen();
@ -697,7 +709,7 @@ public class DFSOutputStream extends FSOutputSummer
streamer.waitForAckedSeqno(toWaitFor); streamer.waitForAckedSeqno(toWaitFor);
} }
private synchronized void start() { protected synchronized void start() {
streamer.start(); streamer.start();
} }
@ -726,7 +738,7 @@ public class DFSOutputStream extends FSOutputSummer
// shutdown datastreamer and responseprocessor threads. // shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true // interrupt datastreamer if force is true
private void closeThreads(boolean force) throws IOException { protected void closeThreads(boolean force) throws IOException {
try { try {
streamer.close(force); streamer.close(force);
streamer.join(); streamer.join();
@ -754,7 +766,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
private synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
IOException e = streamer.getLastException().getAndSet(null); IOException e = streamer.getLastException().getAndSet(null);
if (e == null) if (e == null)
@ -797,7 +809,7 @@ public class DFSOutputStream extends FSOutputSummer
// should be called holding (this) lock since setTestFilename() may // should be called holding (this) lock since setTestFilename() may
// be called during unit tests // be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException { protected void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.monotonicNow(); long localstart = Time.monotonicNow();
long sleeptime = dfsClient.getConf(). long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs; blockWriteLocateFollowingInitialDelayMs;

View File

@ -1519,7 +1519,7 @@ class DataStreamer extends Daemon {
} }
} }
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = dfsClient.getConf(). long sleeptime = dfsClient.getConf().
@ -1728,15 +1728,6 @@ class DataStreamer extends Daemon {
return lastException; return lastException;
} }
/**
* get the socket connecting to the first datanode in pipeline
*
* @return socket connecting to the first datanode in pipeline
*/
Socket getSocket() {
return s;
}
/** /**
* set socket to null * set socket to null
*/ */