HDFS-8283. DataStreamer cleanup and some minor improvement. Contributed by Tsz Wo Nicholas Sze.

(cherry picked from commit 7947e5b53b)
This commit is contained in:
Jing Zhao 2015-04-29 10:41:46 -07:00
parent 4cc38df7de
commit ecdebb7369
5 changed files with 162 additions and 135 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io; package org.apache.hadoop.io;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -51,4 +52,29 @@ public class MultipleIOException extends IOException {
} }
return new MultipleIOException(exceptions); return new MultipleIOException(exceptions);
} }
/**
* Build an {@link IOException} using {@link MultipleIOException}
* if there are more than one.
*/
public static class Builder {
private List<IOException> exceptions;
/** Add the given {@link Throwable} to the exception list. */
public void add(Throwable t) {
if (exceptions == null) {
exceptions = new ArrayList<>();
}
exceptions.add(t instanceof IOException? (IOException)t
: new IOException(t));
}
/**
* @return null if nothing is added to this builder;
* otherwise, return an {@link IOException}
*/
public IOException build() {
return createIOException(exceptions);
}
}
} }

View File

@ -156,6 +156,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9) HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9)
HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -139,8 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
protected void checkClosed() throws IOException { protected void checkClosed() throws IOException {
if (isClosed()) { if (isClosed()) {
IOException e = streamer.getLastException().get(); streamer.getLastException().throwException4Close();
throw e != null ? e : new ClosedChannelException();
} }
} }
@ -216,10 +215,7 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager); cachingStrategy, byteArrayManager, favoredNodes);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
} }
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
@ -282,7 +278,8 @@ public class DFSOutputStream extends FSOutputSummer
/** Construct a new output stream for append. */ /** Construct a new output stream for append. */
private DFSOutputStream(DFSClient dfsClient, String src, private DFSOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum) throws IOException { HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened initialFileSize = stat.getLen(); // length of file when opened
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
@ -303,7 +300,8 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum); bytesPerChecksum);
streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes);
} }
} }
@ -351,10 +349,7 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.getPathTraceScope("newStreamForAppend", src); dfsClient.getPathTraceScope("newStreamForAppend", src);
try { try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
progress, lastBlock, stat, checksum); progress, lastBlock, stat, checksum, favoredNodes);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
}
out.start(); out.start();
return out; return out;
} finally { } finally {
@ -658,7 +653,7 @@ public class DFSOutputStream extends FSOutputSummer
DFSClient.LOG.warn("Error while syncing", e); DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) { synchronized (this) {
if (!isClosed()) { if (!isClosed()) {
streamer.getLastException().set(new IOException("IOException flush: " + e)); streamer.getLastException().set(e);
closeThreads(true); closeThreads(true);
} }
} }
@ -725,7 +720,7 @@ public class DFSOutputStream extends FSOutputSummer
if (isClosed()) { if (isClosed()) {
return; return;
} }
streamer.setLastException(new IOException("Lease timeout of " streamer.getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true); closeThreads(true);
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
@ -772,11 +767,8 @@ public class DFSOutputStream extends FSOutputSummer
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
IOException e = streamer.getLastException().getAndSet(null); streamer.getLastException().check();
if (e == null)
return; return;
else
throw e;
} }
try { try {

View File

@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -73,6 +75,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -88,6 +91,7 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -117,6 +121,7 @@ import com.google.common.cache.RemovalNotification;
@InterfaceAudience.Private @InterfaceAudience.Private
class DataStreamer extends Daemon { class DataStreamer extends Daemon {
static final Log LOG = LogFactory.getLog(DataStreamer.class);
/** /**
* Create a socket for a write pipeline * Create a socket for a write pipeline
* *
@ -129,8 +134,8 @@ class DataStreamer extends Daemon {
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
final DfsClientConf conf = client.getConf(); final DfsClientConf conf = client.getConf();
final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname()); final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); LOG.debug("Connecting to datanode " + dnAddr);
} }
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
final Socket sock = client.socketFactory.createSocket(); final Socket sock = client.socketFactory.createSocket();
@ -138,8 +143,8 @@ class DataStreamer extends Daemon {
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if(DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); LOG.debug("Send buf size " + sock.getSendBufferSize());
} }
return sock; return sock;
} }
@ -169,6 +174,34 @@ class DataStreamer extends Daemon {
packets.clear(); packets.clear();
} }
static class LastException {
private Throwable thrown;
synchronized void set(Throwable t) {
Preconditions.checkNotNull(t);
Preconditions.checkState(thrown == null);
this.thrown = t;
}
synchronized void clear() {
thrown = null;
}
/** Check if there already is an exception. */
synchronized void check() throws IOException {
if (thrown != null) {
throw new IOException(thrown);
}
}
synchronized void throwException4Close() throws IOException {
check();
final IOException ioe = new ClosedChannelException();
thrown = ioe;
throw ioe;
}
}
private volatile boolean streamerClosed = false; private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken; private Token<BlockTokenIdentifier> accessToken;
@ -178,7 +211,6 @@ class DataStreamer extends Daemon {
private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null; private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null; private volatile String[] storageIDs = null;
private String[] favoredNodes;
volatile boolean hasError = false; volatile boolean hasError = false;
volatile int errorIndex = -1; volatile int errorIndex = -1;
// Restarting node index // Restarting node index
@ -196,13 +228,13 @@ class DataStreamer extends Daemon {
/** Has the current block been hflushed? */ /** Has the current block been hflushed? */
private boolean isHflushed = false; private boolean isHflushed = false;
/** Append on an existing block? */ /** Append on an existing block? */
private boolean isAppend; private final boolean isAppend;
private long currentSeqno = 0; private long currentSeqno = 0;
private long lastQueuedSeqno = -1; private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1; private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes written in current block private long bytesCurBlock = 0; // bytes written in current block
private final AtomicReference<IOException> lastException = new AtomicReference<>(); private final LastException lastException = new LastException();
private Socket s; private Socket s;
private final DFSClient dfsClient; private final DFSClient dfsClient;
@ -227,18 +259,20 @@ class DataStreamer extends Daemon {
private long artificialSlowdown = 0; private long artificialSlowdown = 0;
// List of congested data nodes. The stream will back off if the DataNodes // List of congested data nodes. The stream will back off if the DataNodes
// are congested // are congested
private final ArrayList<DatanodeInfo> congestedNodes = new ArrayList<>(); private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int lastCongestionBackoffTime; private int lastCongestionBackoffTime;
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage){ ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes) {
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
this.src = src; this.src = src;
this.progress = progress; this.progress = progress;
@ -246,10 +280,12 @@ class DataStreamer extends Daemon {
this.checksum4WriteBlock = checksum; this.checksum4WriteBlock = checksum;
this.cachingStrategy = cachingStrategy; this.cachingStrategy = cachingStrategy;
this.byteArrayManager = byteArrayManage; this.byteArrayManager = byteArrayManage;
isLazyPersistFile = isLazyPersist(stat); this.isLazyPersistFile = isLazyPersist(stat);
this.dfsclientSlowLogThresholdMs = this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().getSlowIoWarningThresholdMs(); dfsClient.getConf().getSlowIoWarningThresholdMs();
excludedNodes = initExcludedNodes(); this.excludedNodes = initExcludedNodes();
this.isAppend = isAppend;
this.favoredNodes = favoredNodes;
} }
/** /**
@ -258,10 +294,9 @@ class DataStreamer extends Daemon {
DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
String src, Progressable progress, DataChecksum checksum, String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) { ByteArrayManager byteArrayManage, String[] favoredNodes) {
this(stat, dfsClient, src, progress, checksum, cachingStrategy, this(stat, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage); byteArrayManage, false, favoredNodes);
isAppend = false;
this.block = block; this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
} }
@ -277,8 +312,7 @@ class DataStreamer extends Daemon {
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) throws IOException { ByteArrayManager byteArrayManage) throws IOException {
this(stat, dfsClient, src, progress, checksum, cachingStrategy, this(stat, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage); byteArrayManage, true, null);
isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock(); block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
@ -313,15 +347,6 @@ class DataStreamer extends Daemon {
this.storageIDs = storageIDs; this.storageIDs = storageIDs;
} }
/**
* Set favored nodes
*
* @param favoredNodes favored nodes
*/
void setFavoredNodes(String[] favoredNodes) {
this.favoredNodes = favoredNodes;
}
/** /**
* Initialize for data streaming * Initialize for data streaming
*/ */
@ -334,8 +359,8 @@ class DataStreamer extends Daemon {
} }
private void endBlock() { private void endBlock() {
if(DFSClient.LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Closing old block " + block); LOG.debug("Closing old block " + block);
} }
this.setName("DataStreamer for file " + src); this.setName("DataStreamer for file " + src);
closeResponder(); closeResponder();
@ -360,7 +385,7 @@ class DataStreamer extends Daemon {
response.join(); response.join();
response = null; response = null;
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); LOG.warn("Caught exception", e);
} }
} }
@ -388,7 +413,7 @@ class DataStreamer extends Daemon {
try { try {
dataQueue.wait(timeout); dataQueue.wait(timeout);
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); LOG.warn("Caught exception", e);
} }
doSleep = false; doSleep = false;
now = Time.monotonicNow(); now = Time.monotonicNow();
@ -404,7 +429,7 @@ class DataStreamer extends Daemon {
try { try {
backOffIfNecessary(); backOffIfNecessary();
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); LOG.warn("Caught exception", e);
} }
one = dataQueue.getFirst(); // regular data packet one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents(); long parents[] = one.getTraceParents();
@ -419,14 +444,14 @@ class DataStreamer extends Daemon {
// get new block from namenode. // get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(DFSClient.LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block"); LOG.debug("Allocating new block");
} }
setPipeline(nextBlockOutputStream()); setPipeline(nextBlockOutputStream());
initDataStreaming(); initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(DFSClient.LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Append to block " + block); LOG.debug("Append to block " + block);
} }
setupPipelineForAppendOrRecovery(); setupPipelineForAppendOrRecovery();
initDataStreaming(); initDataStreaming();
@ -450,7 +475,7 @@ class DataStreamer extends Daemon {
// wait for acks to arrive from datanodes // wait for acks to arrive from datanodes
dataQueue.wait(1000); dataQueue.wait(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); LOG.warn("Caught exception", e);
} }
} }
} }
@ -473,8 +498,8 @@ class DataStreamer extends Daemon {
} }
} }
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block + LOG.debug("DataStreamer block " + block +
" sending packet " + one); " sending packet " + one);
} }
@ -534,16 +559,12 @@ class DataStreamer extends Daemon {
// Since their messages are descriptive enough, do not always // Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions. // log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) { if (e instanceof QuotaExceededException) {
DFSClient.LOG.debug("DataStreamer Quota Exception", e); LOG.debug("DataStreamer Quota Exception", e);
} else { } else {
DFSClient.LOG.warn("DataStreamer Exception", e); LOG.warn("DataStreamer Exception", e);
} }
} }
if (e instanceof IOException) { lastException.set(e);
setLastException((IOException)e);
} else {
setLastException(new IOException("DataStreamer Exception: ",e));
}
hasError = true; hasError = true;
if (errorIndex == -1 && restartingNodeIndex.get() == -1) { if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
// Not a datanode issue // Not a datanode issue
@ -586,8 +607,8 @@ class DataStreamer extends Daemon {
void waitForAckedSeqno(long seqno) throws IOException { void waitForAckedSeqno(long seqno) throws IOException {
TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
try { try {
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno); LOG.debug("Waiting for ack for: " + seqno);
} }
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
try { try {
@ -611,7 +632,7 @@ class DataStreamer extends Daemon {
} }
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) { if (duration > dfsclientSlowLogThresholdMs) {
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration LOG.warn("Slow waitForAckedSeqno took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
} }
} finally { } finally {
@ -688,8 +709,7 @@ class DataStreamer extends Daemon {
private void checkClosed() throws IOException { private void checkClosed() throws IOException {
if (streamerClosed) { if (streamerClosed) {
IOException e = lastException.get(); lastException.throwException4Close();
throw e != null ? e : new ClosedChannelException();
} }
} }
@ -699,7 +719,7 @@ class DataStreamer extends Daemon {
response.close(); response.close();
response.join(); response.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); LOG.warn("Caught exception", e);
} finally { } finally {
response = null; response = null;
} }
@ -707,11 +727,13 @@ class DataStreamer extends Daemon {
} }
private void closeStream() { private void closeStream() {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
if (blockStream != null) { if (blockStream != null) {
try { try {
blockStream.close(); blockStream.close();
} catch (IOException e) { } catch (IOException e) {
setLastException(e); b.add(e);
} finally { } finally {
blockStream = null; blockStream = null;
} }
@ -720,7 +742,7 @@ class DataStreamer extends Daemon {
try { try {
blockReplyStream.close(); blockReplyStream.close();
} catch (IOException e) { } catch (IOException e) {
setLastException(e); b.add(e);
} finally { } finally {
blockReplyStream = null; blockReplyStream = null;
} }
@ -729,11 +751,16 @@ class DataStreamer extends Daemon {
try { try {
s.close(); s.close();
} catch (IOException e) { } catch (IOException e) {
setLastException(e); b.add(e);
} finally { } finally {
s = null; s = null;
} }
} }
final IOException ioe = b.build();
if (ioe != null) {
lastException.set(ioe);
}
} }
// The following synchronized methods are used whenever // The following synchronized methods are used whenever
@ -825,12 +852,11 @@ class DataStreamer extends Daemon {
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs if (duration > dfsclientSlowLogThresholdMs
&& ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
DFSClient.LOG LOG.warn("Slow ReadProcessor read fields took " + duration
.warn("Slow ReadProcessor read fields took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+ ack + ", targets: " + Arrays.asList(targets)); + ack + ", targets: " + Arrays.asList(targets));
} else if (DFSClient.LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient " + ack); LOG.debug("DFSClient " + ack);
} }
long seqno = ack.getSeqno(); long seqno = ack.getSeqno();
@ -851,7 +877,7 @@ class DataStreamer extends Daemon {
+ Time.monotonicNow(); + Time.monotonicNow();
setRestartingNodeIndex(i); setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i]; String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message); LOG.info(message);
throw new IOException(message); throw new IOException(message);
} }
// node error // node error
@ -917,9 +943,7 @@ class DataStreamer extends Daemon {
} }
} catch (Exception e) { } catch (Exception e) {
if (!responderClosed) { if (!responderClosed) {
if (e instanceof IOException) { lastException.set(e);
setLastException((IOException)e);
}
hasError = true; hasError = true;
// If no explicit error report was received, mark the primary // If no explicit error report was received, mark the primary
// node as failed. // node as failed.
@ -928,8 +952,7 @@ class DataStreamer extends Daemon {
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
if (restartingNodeIndex.get() == -1) { if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn("DataStreamer ResponseProcessor exception " LOG.warn("Exception for " + block, e);
+ " for block " + block, e);
} }
responderClosed = true; responderClosed = true;
} }
@ -951,7 +974,7 @@ class DataStreamer extends Daemon {
// //
private boolean processDatanodeError() throws IOException { private boolean processDatanodeError() throws IOException {
if (response != null) { if (response != null) {
DFSClient.LOG.info("Error Recovery for " + block + LOG.info("Error Recovery for " + block +
" waiting for responder to exit. "); " waiting for responder to exit. ");
return true; return true;
} }
@ -972,7 +995,7 @@ class DataStreamer extends Daemon {
// same packet, this client likely has corrupt data or corrupting // same packet, this client likely has corrupt data or corrupting
// during transmission. // during transmission.
if (++pipelineRecoveryCount > 5) { if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " + LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet."); block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " + lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success.")); "recovery 5 times without success."));
@ -1147,8 +1170,8 @@ class DataStreamer extends Daemon {
if (nodes == null || nodes.length == 0) { if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \"" String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting..."; + src + "\" - Aborting...";
DFSClient.LOG.warn(msg); LOG.warn(msg);
setLastException(new IOException(msg)); lastException.set(new IOException(msg));
streamerClosed = true; streamerClosed = true;
return false; return false;
} }
@ -1193,7 +1216,7 @@ class DataStreamer extends Daemon {
streamerClosed = true; streamerClosed = true;
return false; return false;
} }
DFSClient.LOG.warn("Error Recovery for block " + block + LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg + " in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex]); ": bad datanode " + nodes[errorIndex]);
failed.add(nodes[errorIndex]); failed.add(nodes[errorIndex]);
@ -1227,7 +1250,7 @@ class DataStreamer extends Daemon {
if (restartingNodeIndex.get() == -1) { if (restartingNodeIndex.get() == -1) {
hasError = false; hasError = false;
} }
lastException.set(null); lastException.clear();
errorIndex = -1; errorIndex = -1;
} }
@ -1240,7 +1263,7 @@ class DataStreamer extends Daemon {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe; throw ioe;
} }
DFSClient.LOG.warn("Failed to replace datanode." LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since " + " Continue with the remaining datanodes since "
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
+ " is set to true.", ioe); + " is set to true.", ioe);
@ -1281,7 +1304,7 @@ class DataStreamer extends Daemon {
restartDeadline = 0; restartDeadline = 0;
int expiredNodeIndex = restartingNodeIndex.get(); int expiredNodeIndex = restartingNodeIndex.get();
restartingNodeIndex.set(-1); restartingNodeIndex.set(-1);
DFSClient.LOG.warn("Datanode did not restart in time: " + LOG.warn("Datanode did not restart in time: " +
nodes[expiredNodeIndex]); nodes[expiredNodeIndex]);
// Mark the restarting node as failed. If there is any other failed // Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be // node during the last pipeline construction attempt, it will not be
@ -1321,7 +1344,7 @@ class DataStreamer extends Daemon {
ExtendedBlock oldBlock = block; ExtendedBlock oldBlock = block;
do { do {
hasError = false; hasError = false;
lastException.set(null); lastException.clear();
errorIndex = -1; errorIndex = -1;
success = false; success = false;
@ -1344,11 +1367,11 @@ class DataStreamer extends Daemon {
success = createBlockOutputStream(nodes, storageTypes, 0L, false); success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) { if (!success) {
DFSClient.LOG.info("Abandoning " + block); LOG.info("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
dfsClient.clientName); dfsClient.clientName);
block = null; block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
} }
} while (!success && --count >= 0); } while (!success && --count >= 0);
@ -1365,17 +1388,14 @@ class DataStreamer extends Daemon {
private boolean createBlockOutputStream(DatanodeInfo[] nodes, private boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
if (nodes.length == 0) { if (nodes.length == 0) {
DFSClient.LOG.info("nodes are empty for write pipeline of block " LOG.info("nodes are empty for write pipeline of " + block);
+ block);
return false; return false;
} }
Status pipelineStatus = SUCCESS; Status pipelineStatus = SUCCESS;
String firstBadLink = ""; String firstBadLink = "";
boolean checkRestart = false; boolean checkRestart = false;
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) { LOG.debug("pipeline = " + Arrays.asList(nodes));
DFSClient.LOG.debug("pipeline = " + nodes[i]);
}
} }
// persist blocks on namenode on next flush // persist blocks on namenode on next flush
@ -1447,10 +1467,10 @@ class DataStreamer extends Daemon {
hasError = false; hasError = false;
} catch (IOException ie) { } catch (IOException ie) {
if (restartingNodeIndex.get() == -1) { if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie); LOG.info("Exception in createBlockOutputStream", ie);
} }
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie); + nodes[0] + " : " + ie);
// The encryption key used is invalid. // The encryption key used is invalid.
@ -1480,11 +1500,11 @@ class DataStreamer extends Daemon {
+ Time.monotonicNow(); + Time.monotonicNow();
restartingNodeIndex.set(errorIndex); restartingNodeIndex.set(errorIndex);
errorIndex = -1; errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " + LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex.get()]); nodes[restartingNodeIndex.get()]);
} }
hasError = true; hasError = true;
setLastException(ie); lastException.set(ie);
result = false; // error result = false; // error
} finally { } finally {
if (!result) { if (!result) {
@ -1509,18 +1529,16 @@ class DataStreamer extends Daemon {
new HashSet<String>(Arrays.asList(favoredNodes)); new HashSet<String>(Arrays.asList(favoredNodes));
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() + LOG.debug(nodes[i].getXferAddrWithHostname() +
" was chosen by name node (favored=" + pinnings[i] + " was chosen by name node (favored=" + pinnings[i] + ").");
").");
} }
} }
if (shouldLog && !favoredSet.isEmpty()) { if (shouldLog && !favoredSet.isEmpty()) {
// There is one or more favored nodes that were not allocated. // There is one or more favored nodes that were not allocated.
DFSClient.LOG.warn( LOG.warn("These favored nodes were specified but not chosen: "
"These favored nodes were specified but not chosen: " + + favoredSet + " Specified favored nodes: "
favoredSet + + Arrays.toString(favoredNodes));
" Specified favored nodes: " + Arrays.toString(favoredNodes));
} }
return pinnings; return pinnings;
@ -1557,19 +1575,19 @@ class DataStreamer extends Daemon {
throw e; throw e;
} else { } else {
--retries; --retries;
DFSClient.LOG.info("Exception while adding a block", e); LOG.info("Exception while adding a block", e);
long elapsed = Time.monotonicNow() - localstart; long elapsed = Time.monotonicNow() - localstart;
if (elapsed > 5000) { if (elapsed > 5000) {
DFSClient.LOG.info("Waiting for replication for " LOG.info("Waiting for replication for "
+ (elapsed / 1000) + " seconds"); + (elapsed / 1000) + " seconds");
} }
try { try {
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src LOG.warn("NotReplicatedYetException sleeping " + src
+ " retries left " + retries); + " retries left " + retries);
Thread.sleep(sleeptime); Thread.sleep(sleeptime);
sleeptime *= 2; sleeptime *= 2;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie); LOG.warn("Caught exception", ie);
} }
} }
} else { } else {
@ -1606,7 +1624,7 @@ class DataStreamer extends Daemon {
(int)(base + Math.random() * range)); (int)(base + Math.random() * range));
lastCongestionBackoffTime = t; lastCongestionBackoffTime = t;
sb.append(" are congested. Backing off for ").append(t).append(" ms"); sb.append(" are congested. Backing off for ").append(t).append(" ms");
DFSClient.LOG.info(sb.toString()); LOG.info(sb.toString());
congestedNodes.clear(); congestedNodes.clear();
} }
} }
@ -1642,15 +1660,6 @@ class DataStreamer extends Daemon {
return accessToken; return accessToken;
} }
/**
* set last exception
*
* @param e an exception
*/
void setLastException(IOException e) {
lastException.compareAndSet(null, e);
}
/** /**
* Put a packet to the data queue * Put a packet to the data queue
* *
@ -1662,8 +1671,8 @@ class DataStreamer extends Daemon {
packet.addTraceParent(Trace.currentSpan()); packet.addTraceParent(Trace.currentSpan());
dataQueue.addLast(packet); dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno(); lastQueuedSeqno = packet.getSeqno();
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + packet.getSeqno()); LOG.debug("Queued packet " + packet.getSeqno());
} }
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
@ -1686,7 +1695,7 @@ class DataStreamer extends Daemon {
@Override @Override
public void onRemoval( public void onRemoval(
RemovalNotification<DatanodeInfo, DatanodeInfo> notification) { RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
DFSClient.LOG.info("Removing node " + notification.getKey() LOG.info("Removing node " + notification.getKey()
+ " from the excluded nodes list"); + " from the excluded nodes list");
} }
}).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() { }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
@ -1730,11 +1739,9 @@ class DataStreamer extends Daemon {
} }
/** /**
* get the last exception
*
* @return the last exception * @return the last exception
*/ */
AtomicReference<IOException> getLastException(){ LastException getLastException(){
return lastException; return lastException;
} }

View File

@ -62,7 +62,6 @@ public class TestDFSOutputStream {
FSDataOutputStream os = fs.create(new Path("/test")); FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
"wrappedStream"); "wrappedStream");
@SuppressWarnings("unchecked")
DataStreamer streamer = (DataStreamer) Whitebox DataStreamer streamer = (DataStreamer) Whitebox
.getInternalState(dos, "streamer"); .getInternalState(dos, "streamer");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -122,7 +121,7 @@ public class TestDFSOutputStream {
mock(HdfsFileStatus.class), mock(HdfsFileStatus.class),
mock(ExtendedBlock.class), mock(ExtendedBlock.class),
client, client,
"foo", null, null, null, null); "foo", null, null, null, null, null);
DataOutputStream blockStream = mock(DataOutputStream.class); DataOutputStream blockStream = mock(DataOutputStream.class);
doThrow(new IOException()).when(blockStream).flush(); doThrow(new IOException()).when(blockStream).flush();