From ecdebb7369008883a76242025cf1078b117857d3 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 29 Apr 2015 10:41:46 -0700 Subject: [PATCH] HDFS-8283. DataStreamer cleanup and some minor improvement. Contributed by Tsz Wo Nicholas Sze. (cherry picked from commit 7947e5b53b9ac9524b535b0384c1c355b74723ff) --- .../apache/hadoop/io/MultipleIOException.java | 26 ++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 30 +-- .../org/apache/hadoop/hdfs/DataStreamer.java | 235 +++++++++--------- .../hadoop/hdfs/TestDFSOutputStream.java | 3 +- 5 files changed, 162 insertions(+), 135 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java index 5e584c9cd07..66c1ab12209 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -51,4 +52,29 @@ public static IOException createIOException(List exceptions) { } return new MultipleIOException(exceptions); } + + /** + * Build an {@link IOException} using {@link MultipleIOException} + * if there are more than one. + */ + public static class Builder { + private List exceptions; + + /** Add the given {@link Throwable} to the exception list. */ + public void add(Throwable t) { + if (exceptions == null) { + exceptions = new ArrayList<>(); + } + exceptions.add(t instanceof IOException? (IOException)t + : new IOException(t)); + } + + /** + * @return null if nothing is added to this builder; + * otherwise, return an {@link IOException} + */ + public IOException build() { + return createIOException(exceptions); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0f97ac334ac..8a4fd798ea8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -156,6 +156,9 @@ Release 2.8.0 - UNRELEASED HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9) + HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via + jing9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 47885e9b662..f902d211982 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -139,8 +139,7 @@ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetIn @Override protected void checkClosed() throws IOException { if (isClosed()) { - IOException e = streamer.getLastException().get(); - throw e != null ? e : new ClosedChannelException(); + streamer.getLastException().throwException4Close(); } } @@ -216,10 +215,7 @@ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager); - if (favoredNodes != null && favoredNodes.length != 0) { - streamer.setFavoredNodes(favoredNodes); - } + cachingStrategy, byteArrayManager, favoredNodes); } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, @@ -282,7 +278,8 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, /** Construct a new output stream for append. */ private DFSOutputStream(DFSClient dfsClient, String src, EnumSet flags, Progressable progress, LocatedBlock lastBlock, - HdfsFileStatus stat, DataChecksum checksum) throws IOException { + HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) + throws IOException { this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); @@ -303,7 +300,8 @@ private DFSOutputStream(DFSClient dfsClient, String src, computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, - dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + favoredNodes); } } @@ -351,10 +349,7 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, dfsClient.getPathTraceScope("newStreamForAppend", src); try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, - progress, lastBlock, stat, checksum); - if (favoredNodes != null && favoredNodes.length != 0) { - out.streamer.setFavoredNodes(favoredNodes); - } + progress, lastBlock, stat, checksum, favoredNodes); out.start(); return out; } finally { @@ -658,7 +653,7 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!isClosed()) { - streamer.getLastException().set(new IOException("IOException flush: " + e)); + streamer.getLastException().set(e); closeThreads(true); } } @@ -725,7 +720,7 @@ synchronized void abort() throws IOException { if (isClosed()) { return; } - streamer.setLastException(new IOException("Lease timeout of " + streamer.getLastException().set(new IOException("Lease timeout of " + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(fileId); @@ -772,11 +767,8 @@ public synchronized void close() throws IOException { protected synchronized void closeImpl() throws IOException { if (isClosed()) { - IOException e = streamer.getLastException().getAndSet(null); - if (e == null) - return; - else - throw e; + streamer.getLastException().check(); + return; } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 5f0c9ac6394..3727d20ea91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -73,6 +75,7 @@ import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -88,6 +91,7 @@ import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -117,6 +121,7 @@ @InterfaceAudience.Private class DataStreamer extends Daemon { + static final Log LOG = LogFactory.getLog(DataStreamer.class); /** * Create a socket for a write pipeline * @@ -129,8 +134,8 @@ static Socket createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client) throws IOException { final DfsClientConf conf = client.getConf(); final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname()); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + dnAddr); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr); } final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final Socket sock = client.socketFactory.createSocket(); @@ -138,8 +143,8 @@ static Socket createSocketForPipeline(final DatanodeInfo first, NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); sock.setSoTimeout(timeout); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); + if (LOG.isDebugEnabled()) { + LOG.debug("Send buf size " + sock.getSendBufferSize()); } return sock; } @@ -168,6 +173,34 @@ private static void releaseBuffer(List packets, ByteArrayManager bam) } packets.clear(); } + + static class LastException { + private Throwable thrown; + + synchronized void set(Throwable t) { + Preconditions.checkNotNull(t); + Preconditions.checkState(thrown == null); + this.thrown = t; + } + + synchronized void clear() { + thrown = null; + } + + /** Check if there already is an exception. */ + synchronized void check() throws IOException { + if (thrown != null) { + throw new IOException(thrown); + } + } + + synchronized void throwException4Close() throws IOException { + check(); + final IOException ioe = new ClosedChannelException(); + thrown = ioe; + throw ioe; + } + } private volatile boolean streamerClosed = false; private ExtendedBlock block; // its length is number of bytes acked @@ -178,7 +211,6 @@ private static void releaseBuffer(List packets, ByteArrayManager bam) private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; - private String[] favoredNodes; volatile boolean hasError = false; volatile int errorIndex = -1; // Restarting node index @@ -196,13 +228,13 @@ private static void releaseBuffer(List packets, ByteArrayManager bam) /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ - private boolean isAppend; + private final boolean isAppend; private long currentSeqno = 0; private long lastQueuedSeqno = -1; private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes written in current block - private final AtomicReference lastException = new AtomicReference<>(); + private final LastException lastException = new LastException(); private Socket s; private final DFSClient dfsClient; @@ -227,18 +259,20 @@ private static void releaseBuffer(List packets, ByteArrayManager bam) private long artificialSlowdown = 0; // List of congested data nodes. The stream will back off if the DataNodes // are congested - private final ArrayList congestedNodes = new ArrayList<>(); + private final List congestedNodes = new ArrayList<>(); private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; private int lastCongestionBackoffTime; private final LoadingCache excludedNodes; + private final String[] favoredNodes; private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage){ + ByteArrayManager byteArrayManage, + boolean isAppend, String[] favoredNodes) { this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -246,10 +280,12 @@ private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, this.checksum4WriteBlock = checksum; this.cachingStrategy = cachingStrategy; this.byteArrayManager = byteArrayManage; - isLazyPersistFile = isLazyPersist(stat); + this.isLazyPersistFile = isLazyPersist(stat); this.dfsclientSlowLogThresholdMs = dfsClient.getConf().getSlowIoWarningThresholdMs(); - excludedNodes = initExcludedNodes(); + this.excludedNodes = initExcludedNodes(); + this.isAppend = isAppend; + this.favoredNodes = favoredNodes; } /** @@ -258,10 +294,9 @@ private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage) { + ByteArrayManager byteArrayManage, String[] favoredNodes) { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage); - isAppend = false; + byteArrayManage, false, favoredNodes); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -277,8 +312,7 @@ private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage); - isAppend = true; + byteArrayManage, true, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); @@ -313,15 +347,6 @@ private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, this.storageIDs = storageIDs; } - /** - * Set favored nodes - * - * @param favoredNodes favored nodes - */ - void setFavoredNodes(String[] favoredNodes) { - this.favoredNodes = favoredNodes; - } - /** * Initialize for data streaming */ @@ -334,8 +359,8 @@ private void initDataStreaming() { } private void endBlock() { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Closing old block " + block); + if(LOG.isDebugEnabled()) { + LOG.debug("Closing old block " + block); } this.setName("DataStreamer for file " + src); closeResponder(); @@ -360,7 +385,7 @@ public void run() { response.join(); response = null; } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); + LOG.warn("Caught exception", e); } } @@ -388,7 +413,7 @@ public void run() { try { dataQueue.wait(timeout); } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); + LOG.warn("Caught exception", e); } doSleep = false; now = Time.monotonicNow(); @@ -404,7 +429,7 @@ public void run() { try { backOffIfNecessary(); } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); + LOG.warn("Caught exception", e); } one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); @@ -419,14 +444,14 @@ public void run() { // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Allocating new block"); + if(LOG.isDebugEnabled()) { + LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Append to block " + block); + if(LOG.isDebugEnabled()) { + LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); @@ -450,7 +475,7 @@ public void run() { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); + LOG.warn("Caught exception", e); } } } @@ -473,8 +498,8 @@ public void run() { } } - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DataStreamer block " + block + + if (LOG.isDebugEnabled()) { + LOG.debug("DataStreamer block " + block + " sending packet " + one); } @@ -534,16 +559,12 @@ public void run() { // Since their messages are descriptive enough, do not always // log a verbose stack-trace WARN for quota exceptions. if (e instanceof QuotaExceededException) { - DFSClient.LOG.debug("DataStreamer Quota Exception", e); + LOG.debug("DataStreamer Quota Exception", e); } else { - DFSClient.LOG.warn("DataStreamer Exception", e); + LOG.warn("DataStreamer Exception", e); } } - if (e instanceof IOException) { - setLastException((IOException)e); - } else { - setLastException(new IOException("DataStreamer Exception: ",e)); - } + lastException.set(e); hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue @@ -586,8 +607,8 @@ void release() { void waitForAckedSeqno(long seqno) throws IOException { TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); try { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waiting for ack for: " + seqno); + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for ack for: " + seqno); } long begin = Time.monotonicNow(); try { @@ -611,7 +632,7 @@ void waitForAckedSeqno(long seqno) throws IOException { } long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs) { - DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration + LOG.warn("Slow waitForAckedSeqno took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); } } finally { @@ -688,8 +709,7 @@ void close(boolean force) { private void checkClosed() throws IOException { if (streamerClosed) { - IOException e = lastException.get(); - throw e != null ? e : new ClosedChannelException(); + lastException.throwException4Close(); } } @@ -699,7 +719,7 @@ private void closeResponder() { response.close(); response.join(); } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); + LOG.warn("Caught exception", e); } finally { response = null; } @@ -707,11 +727,13 @@ private void closeResponder() { } private void closeStream() { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + if (blockStream != null) { try { blockStream.close(); } catch (IOException e) { - setLastException(e); + b.add(e); } finally { blockStream = null; } @@ -720,7 +742,7 @@ private void closeStream() { try { blockReplyStream.close(); } catch (IOException e) { - setLastException(e); + b.add(e); } finally { blockReplyStream = null; } @@ -729,11 +751,16 @@ private void closeStream() { try { s.close(); } catch (IOException e) { - setLastException(e); + b.add(e); } finally { s = null; } } + + final IOException ioe = b.build(); + if (ioe != null) { + lastException.set(ioe); + } } // The following synchronized methods are used whenever @@ -825,12 +852,11 @@ public void run() { long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { - DFSClient.LOG - .warn("Slow ReadProcessor read fields took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " - + ack + ", targets: " + Arrays.asList(targets)); - } else if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient " + ack); + LOG.warn("Slow ReadProcessor read fields took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + + ack + ", targets: " + Arrays.asList(targets)); + } else if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient " + ack); } long seqno = ack.getSeqno(); @@ -851,7 +877,7 @@ public void run() { + Time.monotonicNow(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; - DFSClient.LOG.info(message); + LOG.info(message); throw new IOException(message); } // node error @@ -917,9 +943,7 @@ public void run() { } } catch (Exception e) { if (!responderClosed) { - if (e instanceof IOException) { - setLastException((IOException)e); - } + lastException.set(e); hasError = true; // If no explicit error report was received, mark the primary // node as failed. @@ -928,8 +952,7 @@ public void run() { dataQueue.notifyAll(); } if (restartingNodeIndex.get() == -1) { - DFSClient.LOG.warn("DataStreamer ResponseProcessor exception " - + " for block " + block, e); + LOG.warn("Exception for " + block, e); } responderClosed = true; } @@ -951,7 +974,7 @@ void close() { // private boolean processDatanodeError() throws IOException { if (response != null) { - DFSClient.LOG.info("Error Recovery for " + block + + LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); return true; } @@ -972,7 +995,7 @@ private boolean processDatanodeError() throws IOException { // same packet, this client likely has corrupt data or corrupting // during transmission. if (++pipelineRecoveryCount > 5) { - DFSClient.LOG.warn("Error recovering pipeline for writing " + + LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); lastException.set(new IOException("Failing write. Tried pipeline " + "recovery 5 times without success.")); @@ -1147,8 +1170,8 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" + src + "\" - Aborting..."; - DFSClient.LOG.warn(msg); - setLastException(new IOException(msg)); + LOG.warn(msg); + lastException.set(new IOException(msg)); streamerClosed = true; return false; } @@ -1193,7 +1216,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { streamerClosed = true; return false; } - DFSClient.LOG.warn("Error Recovery for block " + block + + LOG.warn("Error Recovery for block " + block + " in pipeline " + pipelineMsg + ": bad datanode " + nodes[errorIndex]); failed.add(nodes[errorIndex]); @@ -1227,7 +1250,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { if (restartingNodeIndex.get() == -1) { hasError = false; } - lastException.set(null); + lastException.clear(); errorIndex = -1; } @@ -1240,7 +1263,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { throw ioe; } - DFSClient.LOG.warn("Failed to replace datanode." + LOG.warn("Failed to replace datanode." + " Continue with the remaining datanodes since " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY + " is set to true.", ioe); @@ -1281,7 +1304,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { restartDeadline = 0; int expiredNodeIndex = restartingNodeIndex.get(); restartingNodeIndex.set(-1); - DFSClient.LOG.warn("Datanode did not restart in time: " + + LOG.warn("Datanode did not restart in time: " + nodes[expiredNodeIndex]); // Mark the restarting node as failed. If there is any other failed // node during the last pipeline construction attempt, it will not be @@ -1321,7 +1344,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException { ExtendedBlock oldBlock = block; do { hasError = false; - lastException.set(null); + lastException.clear(); errorIndex = -1; success = false; @@ -1344,11 +1367,11 @@ private LocatedBlock nextBlockOutputStream() throws IOException { success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { - DFSClient.LOG.info("Abandoning " + block); + LOG.info("Abandoning " + block); dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, dfsClient.clientName); block = null; - DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); + LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); } } while (!success && --count >= 0); @@ -1365,17 +1388,14 @@ private LocatedBlock nextBlockOutputStream() throws IOException { private boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { - DFSClient.LOG.info("nodes are empty for write pipeline of block " - + block); + LOG.info("nodes are empty for write pipeline of " + block); return false; } Status pipelineStatus = SUCCESS; String firstBadLink = ""; boolean checkRestart = false; - if (DFSClient.LOG.isDebugEnabled()) { - for (int i = 0; i < nodes.length; i++) { - DFSClient.LOG.debug("pipeline = " + nodes[i]); - } + if (LOG.isDebugEnabled()) { + LOG.debug("pipeline = " + Arrays.asList(nodes)); } // persist blocks on namenode on next flush @@ -1447,10 +1467,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, hasError = false; } catch (IOException ie) { if (restartingNodeIndex.get() == -1) { - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + LOG.info("Exception in createBlockOutputStream", ie); } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " + LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + nodes[0] + " : " + ie); // The encryption key used is invalid. @@ -1480,11 +1500,11 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, + Time.monotonicNow(); restartingNodeIndex.set(errorIndex); errorIndex = -1; - DFSClient.LOG.info("Waiting for the datanode to be restarted: " + + LOG.info("Waiting for the datanode to be restarted: " + nodes[restartingNodeIndex.get()]); } hasError = true; - setLastException(ie); + lastException.set(ie); result = false; // error } finally { if (!result) { @@ -1509,18 +1529,16 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { new HashSet(Arrays.asList(favoredNodes)); for (int i = 0; i < nodes.length; i++) { pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() + - " was chosen by name node (favored=" + pinnings[i] + - ")."); + if (LOG.isDebugEnabled()) { + LOG.debug(nodes[i].getXferAddrWithHostname() + + " was chosen by name node (favored=" + pinnings[i] + ")."); } } if (shouldLog && !favoredSet.isEmpty()) { // There is one or more favored nodes that were not allocated. - DFSClient.LOG.warn( - "These favored nodes were specified but not chosen: " + - favoredSet + - " Specified favored nodes: " + Arrays.toString(favoredNodes)); + LOG.warn("These favored nodes were specified but not chosen: " + + favoredSet + " Specified favored nodes: " + + Arrays.toString(favoredNodes)); } return pinnings; @@ -1557,19 +1575,19 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throw e; } else { --retries; - DFSClient.LOG.info("Exception while adding a block", e); + LOG.info("Exception while adding a block", e); long elapsed = Time.monotonicNow() - localstart; if (elapsed > 5000) { - DFSClient.LOG.info("Waiting for replication for " + LOG.info("Waiting for replication for " + (elapsed / 1000) + " seconds"); } try { - DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { - DFSClient.LOG.warn("Caught exception ", ie); + LOG.warn("Caught exception", ie); } } } else { @@ -1606,7 +1624,7 @@ private void backOffIfNecessary() throws InterruptedException { (int)(base + Math.random() * range)); lastCongestionBackoffTime = t; sb.append(" are congested. Backing off for ").append(t).append(" ms"); - DFSClient.LOG.info(sb.toString()); + LOG.info(sb.toString()); congestedNodes.clear(); } } @@ -1642,15 +1660,6 @@ Token getBlockToken() { return accessToken; } - /** - * set last exception - * - * @param e an exception - */ - void setLastException(IOException e) { - lastException.compareAndSet(null, e); - } - /** * Put a packet to the data queue * @@ -1662,8 +1671,8 @@ void queuePacket(DFSPacket packet) { packet.addTraceParent(Trace.currentSpan()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Queued packet " + packet.getSeqno()); + if (LOG.isDebugEnabled()) { + LOG.debug("Queued packet " + packet.getSeqno()); } dataQueue.notifyAll(); } @@ -1686,7 +1695,7 @@ private LoadingCache initExcludedNodes() { @Override public void onRemoval( RemovalNotification notification) { - DFSClient.LOG.info("Removing node " + notification.getKey() + LOG.info("Removing node " + notification.getKey() + " from the excluded nodes list"); } }).build(new CacheLoader() { @@ -1730,11 +1739,9 @@ boolean getAppendChunk(){ } /** - * get the last exception - * * @return the last exception */ - AtomicReference getLastException(){ + LastException getLastException(){ return lastException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 478f7e5447f..eac1fcd289a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -62,7 +62,6 @@ public void testCloseTwice() throws IOException { FSDataOutputStream os = fs.create(new Path("/test")); DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, "wrappedStream"); - @SuppressWarnings("unchecked") DataStreamer streamer = (DataStreamer) Whitebox .getInternalState(dos, "streamer"); @SuppressWarnings("unchecked") @@ -122,7 +121,7 @@ public void testCongestionBackoff() throws IOException { mock(HdfsFileStatus.class), mock(ExtendedBlock.class), client, - "foo", null, null, null, null); + "foo", null, null, null, null, null); DataOutputStream blockStream = mock(DataOutputStream.class); doThrow(new IOException()).when(blockStream).flush();