diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 62b1d97a9ad..d44030da754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -221,6 +221,9 @@ Release 2.8.0 - UNRELEASED HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb) + HDFS-8397. Refactor the error handling code in DataStreamer. + (Tsz Wo Nicholas Sze via jing9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index b472b8c6451..cecd5a0eccf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -38,7 +38,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -46,6 +45,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -208,6 +208,133 @@ class DataStreamer extends Daemon { } } + static class ErrorState { + private boolean error = false; + private int badNodeIndex = -1; + private int restartingNodeIndex = -1; + private long restartingNodeDeadline = 0; + private final long datanodeRestartTimeout; + + ErrorState(long datanodeRestartTimeout) { + this.datanodeRestartTimeout = datanodeRestartTimeout; + } + + synchronized void reset() { + error = false; + badNodeIndex = -1; + restartingNodeIndex = -1; + restartingNodeDeadline = 0; + } + + synchronized boolean hasError() { + return error; + } + + synchronized boolean hasDatanodeError() { + return error && isNodeMarked(); + } + + synchronized void setError(boolean err) { + this.error = err; + } + + synchronized void setBadNodeIndex(int index) { + this.badNodeIndex = index; + } + + synchronized int getBadNodeIndex() { + return badNodeIndex; + } + + synchronized int getRestartingNodeIndex() { + return restartingNodeIndex; + } + + synchronized void initRestartingNode(int i, String message) { + restartingNodeIndex = i; + restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout; + // If the data streamer has already set the primary node + // bad, clear it. It is likely that the write failed due to + // the DN shutdown. Even if it was a real failure, the pipeline + // recovery will take care of it. + badNodeIndex = -1; + LOG.info(message); + } + + synchronized boolean isRestartingNode() { + return restartingNodeIndex >= 0; + } + + synchronized boolean isNodeMarked() { + return badNodeIndex >= 0 || isRestartingNode(); + } + + /** + * This method is used when no explicit error report was received, but + * something failed. The first node is a suspect or unsure about the cause + * so that it is marked as failed. + */ + synchronized void markFirstNodeIfNotMarked() { + // There should be no existing error and no ongoing restart. + if (!isNodeMarked()) { + badNodeIndex = 0; + } + } + + synchronized void adjustState4RestartingNode() { + // Just took care of a node error while waiting for a node restart + if (restartingNodeIndex >= 0) { + // If the error came from a node further away than the restarting + // node, the restart must have been complete. + if (badNodeIndex > restartingNodeIndex) { + restartingNodeIndex = -1; + } else if (badNodeIndex < restartingNodeIndex) { + // the node index has shifted. + restartingNodeIndex--; + } else { + throw new IllegalStateException("badNodeIndex = " + badNodeIndex + + " = restartingNodeIndex = " + restartingNodeIndex); + } + } + + if (!isRestartingNode()) { + error = false; + } + badNodeIndex = -1; + } + + synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { + if (restartingNodeIndex >= 0) { + if (!error) { + throw new IllegalStateException("error=false while checking" + + " restarting node deadline"); + } + + // check badNodeIndex + if (badNodeIndex == restartingNodeIndex) { + // ignore, if came from the restarting node + badNodeIndex = -1; + } + // not within the deadline + if (Time.monotonicNow() >= restartingNodeDeadline) { + // expired. declare the restarting node dead + restartingNodeDeadline = 0; + final int i = restartingNodeIndex; + restartingNodeIndex = -1; + LOG.warn("Datanode " + i + " did not restart within " + + datanodeRestartTimeout + "ms: " + nodes[i]); + // Mark the restarting node as failed. If there is any other failed + // node during the last pipeline construction attempt, it will not be + // overwritten/dropped. In this case, the restarting node will get + // excluded in the following attempt, if it still does not come up. + if (badNodeIndex == -1) { + badNodeIndex = i; + } + } + } + } + } + private volatile boolean streamerClosed = false; private ExtendedBlock block; // its length is number of bytes acked private Token accessToken; @@ -217,11 +344,8 @@ class DataStreamer extends Daemon { private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; - volatile boolean hasError = false; - volatile int errorIndex = -1; - // Restarting node index - AtomicInteger restartingNodeIndex = new AtomicInteger(-1); - private long restartDeadline = 0; // Deadline of DN restart + private final ErrorState errorState; + private BlockConstructionStage stage; // block construction stage private long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; @@ -287,11 +411,13 @@ class DataStreamer extends Daemon { this.cachingStrategy = cachingStrategy; this.byteArrayManager = byteArrayManage; this.isLazyPersistFile = isLazyPersist(stat); - this.dfsclientSlowLogThresholdMs = - dfsClient.getConf().getSlowIoWarningThresholdMs(); - this.excludedNodes = initExcludedNodes(); this.isAppend = isAppend; this.favoredNodes = favoredNodes; + + final DfsClientConf conf = dfsClient.getConf(); + this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); + this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); + this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); } /** @@ -334,7 +460,6 @@ class DataStreamer extends Daemon { void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{ // setup pipeline to append to the last block XXX retries?? setPipeline(lastBlock); - errorIndex = -1; // no errors yet. if (nodes.length < 1) { throw new IOException("Unable to retrieve blocks locations " + " for last block " + block + @@ -375,6 +500,10 @@ class DataStreamer extends Daemon { stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } + private boolean shouldStop() { + return streamerClosed || errorState.hasError() || !dfsClient.clientRunning; + } + /* * streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread. @@ -385,7 +514,7 @@ class DataStreamer extends Daemon { TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder - if (hasError && response != null) { + if (errorState.hasError() && response != null) { try { response.close(); response.join(); @@ -398,17 +527,13 @@ class DataStreamer extends Daemon { DFSPacket one; try { // process datanode IO errors if any - boolean doSleep = false; - if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { - doSleep = processDatanodeError(); - } + boolean doSleep = processDatanodeError(); final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); - while ((!streamerClosed && !hasError && dfsClient.clientRunning - && dataQueue.size() == 0 && + while ((!shouldStop() && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < halfSocketTimeout)) || doSleep ) { @@ -424,13 +549,12 @@ class DataStreamer extends Daemon { doSleep = false; now = Time.monotonicNow(); } - if (streamerClosed || hasError || !dfsClient.clientRunning) { + if (shouldStop()) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); - assert one != null; } else { try { backOffIfNecessary(); @@ -460,7 +584,7 @@ class DataStreamer extends Daemon { LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); - if (true == streamerClosed) { + if (streamerClosed) { continue; } initDataStreaming(); @@ -478,8 +602,7 @@ class DataStreamer extends Daemon { if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { - while (!streamerClosed && !hasError && - ackQueue.size() != 0 && dfsClient.clientRunning) { + while (!shouldStop() && ackQueue.size() != 0) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); @@ -488,7 +611,7 @@ class DataStreamer extends Daemon { } } } - if (streamerClosed || hasError || !dfsClient.clientRunning) { + if (shouldStop()) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; @@ -524,7 +647,7 @@ class DataStreamer extends Daemon { // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. - tryMarkPrimaryDatanodeFailed(); + errorState.markFirstNodeIfNotMarked(); throw e; } finally { writeScope.close(); @@ -537,7 +660,7 @@ class DataStreamer extends Daemon { bytesSent = tmpBytesSent; } - if (streamerClosed || hasError || !dfsClient.clientRunning) { + if (shouldStop()) { continue; } @@ -545,12 +668,11 @@ class DataStreamer extends Daemon { if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { - while (!streamerClosed && !hasError && - ackQueue.size() != 0 && dfsClient.clientRunning) { + while (!shouldStop() && ackQueue.size() != 0) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } - if (streamerClosed || hasError || !dfsClient.clientRunning) { + if (shouldStop()) { continue; } @@ -564,7 +686,7 @@ class DataStreamer extends Daemon { } } catch (Throwable e) { // Log warning if there was a real error. - if (restartingNodeIndex.get() == -1) { + if (!errorState.isRestartingNode()) { // Since their messages are descriptive enough, do not always // log a verbose stack-trace WARN for quota exceptions. if (e instanceof QuotaExceededException) { @@ -575,8 +697,8 @@ class DataStreamer extends Daemon { } lastException.set(e); assert !(e instanceof NullPointerException); - hasError = true; - if (errorIndex == -1 && restartingNodeIndex.get() == -1) { + errorState.setError(true); + if (!errorState.isNodeMarked()) { // Not a datanode issue streamerClosed = true; } @@ -773,40 +895,6 @@ class DataStreamer extends Daemon { } } - // The following synchronized methods are used whenever - // errorIndex or restartingNodeIndex is set. This is because - // check & set needs to be atomic. Simply reading variables - // does not require a synchronization. When responder is - // not running (e.g. during pipeline recovery), there is no - // need to use these methods. - - /** Set the error node index. Called by responder */ - synchronized void setErrorIndex(int idx) { - errorIndex = idx; - } - - /** Set the restarting node index. Called by responder */ - synchronized void setRestartingNodeIndex(int idx) { - restartingNodeIndex.set(idx); - // If the data streamer has already set the primary node - // bad, clear it. It is likely that the write failed due to - // the DN shutdown. Even if it was a real failure, the pipeline - // recovery will take care of it. - errorIndex = -1; - } - - /** - * This method is used when no explicit error report was received, - * but something failed. When the primary node is a suspect or - * unsure about the cause, the primary node is marked as failed. - */ - synchronized void tryMarkPrimaryDatanodeFailed() { - // There should be no existing error and no ongoing restart. - if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) { - errorIndex = 0; - } - } - /** * Examine whether it is worth waiting for a node to restart. * @param index the node index @@ -883,20 +971,16 @@ class DataStreamer extends Daemon { // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) { - restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout() - + Time.monotonicNow(); - setRestartingNodeIndex(i); - String message = "A datanode is restarting: " + targets[i]; - LOG.info(message); + final String message = "Datanode " + i + " is restarting: " + + targets[i]; + errorState.initRestartingNode(i, message); throw new IOException(message); } // node error if (reply != SUCCESS) { - setErrorIndex(i); // first bad datanode + errorState.setBadNodeIndex(i); // mark bad datanode throw new IOException("Bad response " + reply + - " for block " + block + - " from datanode " + - targets[i]); + " for " + block + " from datanode " + targets[i]); } } @@ -954,14 +1038,12 @@ class DataStreamer extends Daemon { } catch (Exception e) { if (!responderClosed) { lastException.set(e); - hasError = true; - // If no explicit error report was received, mark the primary - // node as failed. - tryMarkPrimaryDatanodeFailed(); + errorState.setError(true); + errorState.markFirstNodeIfNotMarked(); synchronized (dataQueue) { dataQueue.notifyAll(); } - if (restartingNodeIndex.get() == -1) { + if (!errorState.isRestartingNode()) { LOG.warn("Exception for " + block, e); } responderClosed = true; @@ -978,11 +1060,16 @@ class DataStreamer extends Daemon { } } - // If this stream has encountered any errors so far, shutdown - // threads and mark stream as closed. Returns true if we should - // sleep for a while after returning from this call. - // + /** + * If this stream has encountered any errors, shutdown threads + * and mark the stream as closed. + * + * @return true if it should sleep for a while after returning. + */ private boolean processDatanodeError() throws IOException { + if (!errorState.hasDatanodeError()) { + return false; + } if (response != null) { LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); @@ -1064,7 +1151,7 @@ class DataStreamer extends Daemon { .append("The current failed datanode replacement policy is ") .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") .append("a client may configure this via '") - .append(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY) + .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY) .append("' in its configuration.") .toString()); } @@ -1190,156 +1277,140 @@ class DataStreamer extends Daemon { boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { - // Sleep before reconnect if a dn is restarting. - // This process will be repeated until the deadline or the datanode - // starts back up. - if (restartingNodeIndex.get() >= 0) { - // 4 seconds or the configured deadline period, whichever is shorter. - // This is the retry interval and recovery will be retried in this - // interval until timeout or success. - long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(), - 4000L); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - lastException.set(new IOException("Interrupted while waiting for " + - "datanode to restart. " + nodes[restartingNodeIndex.get()])); - streamerClosed = true; - return false; - } - } - boolean isRecovery = hasError; - // remove bad datanode from list of datanodes. - // If errorIndex was not set (i.e. appends), then do not remove - // any datanodes - // - if (errorIndex >= 0) { - StringBuilder pipelineMsg = new StringBuilder(); - for (int j = 0; j < nodes.length; j++) { - pipelineMsg.append(nodes[j]); - if (j < nodes.length - 1) { - pipelineMsg.append(", "); - } - } - if (nodes.length <= 1) { - lastException.set(new IOException("All datanodes " + pipelineMsg - + " are bad. Aborting...")); - streamerClosed = true; - return false; - } - LOG.warn("Error Recovery for block " + block + - " in pipeline " + pipelineMsg + - ": bad datanode " + nodes[errorIndex]); - failed.add(nodes[errorIndex]); - - DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - arraycopy(nodes, newnodes, errorIndex); - - final StorageType[] newStorageTypes = new StorageType[newnodes.length]; - arraycopy(storageTypes, newStorageTypes, errorIndex); - - final String[] newStorageIDs = new String[newnodes.length]; - arraycopy(storageIDs, newStorageIDs, errorIndex); - - setPipeline(newnodes, newStorageTypes, newStorageIDs); - - // Just took care of a node error while waiting for a node restart - if (restartingNodeIndex.get() >= 0) { - // If the error came from a node further away than the restarting - // node, the restart must have been complete. - if (errorIndex > restartingNodeIndex.get()) { - restartingNodeIndex.set(-1); - } else if (errorIndex < restartingNodeIndex.get()) { - // the node index has shifted. - restartingNodeIndex.decrementAndGet(); - } else { - // this shouldn't happen... - assert false; - } - } - - if (restartingNodeIndex.get() == -1) { - hasError = false; - } - lastException.clear(); - errorIndex = -1; + if (!handleRestartingDatanode()) { + return false; } - // Check if replace-datanode policy is satisfied. - if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(), - nodes, isAppend, isHflushed)) { - try { - addDatanode2ExistingPipeline(); - } catch(IOException ioe) { - if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { - throw ioe; - } - LOG.warn("Failed to replace datanode." - + " Continue with the remaining datanodes since " - + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY - + " is set to true.", ioe); - } + final boolean isRecovery = errorState.hasError(); + if (!handleBadDatanode()) { + return false; } + handleDatanodeReplacement(); + // get a new generation stamp and an access token - LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); + final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); // set up the pipeline again with the remaining nodes - if (failPacket) { // for testing - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - failPacket = false; - try { - // Give DNs time to send in bad reports. In real situations, - // good reports should follow bad ones, if client committed - // with those nodes. - Thread.sleep(2000); - } catch (InterruptedException ie) {} - } else { - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - } + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - if (restartingNodeIndex.get() >= 0) { - assert hasError == true; - // check errorIndex set above - if (errorIndex == restartingNodeIndex.get()) { - // ignore, if came from the restarting node - errorIndex = -1; - } - // still within the deadline - if (Time.monotonicNow() < restartDeadline) { - continue; // with in the deadline - } - // expired. declare the restarting node dead - restartDeadline = 0; - int expiredNodeIndex = restartingNodeIndex.get(); - restartingNodeIndex.set(-1); - LOG.warn("Datanode did not restart in time: " + - nodes[expiredNodeIndex]); - // Mark the restarting node as failed. If there is any other failed - // node during the last pipeline construction attempt, it will not be - // overwritten/dropped. In this case, the restarting node will get - // excluded in the following attempt, if it still does not come up. - if (errorIndex == -1) { - errorIndex = expiredNodeIndex; - } - // From this point on, normal pipeline recovery applies. - } + failPacket4Testing(); + + errorState.checkRestartingNodeDeadline(nodes); } // while if (success) { - // update pipeline at the namenode - ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - // update client side generation stamp - block = newBlock; + block = updatePipeline(newGS); } return false; // do not sleep, continue processing } + /** + * Sleep if a node is restarting. + * This process is repeated until the deadline or the node starts back up. + * @return true if it should continue. + */ + private boolean handleRestartingDatanode() { + if (errorState.isRestartingNode()) { + // 4 seconds or the configured deadline period, whichever is shorter. + // This is the retry interval and recovery will be retried in this + // interval until timeout or success. + final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + lastException.set(new IOException( + "Interrupted while waiting for restarting " + + nodes[errorState.getRestartingNodeIndex()])); + streamerClosed = true; + return false; + } + } + return true; + } + + /** + * Remove bad node from list of nodes if badNodeIndex was set. + * @return true if it should continue. + */ + private boolean handleBadDatanode() { + final int badNodeIndex = errorState.getBadNodeIndex(); + if (badNodeIndex >= 0) { + if (nodes.length <= 1) { + lastException.set(new IOException("All datanodes " + + Arrays.toString(nodes) + " are bad. Aborting...")); + streamerClosed = true; + return false; + } + + LOG.warn("Error Recovery for " + block + " in pipeline " + + Arrays.toString(nodes) + ": datanode " + badNodeIndex + + "("+ nodes[badNodeIndex] + ") is bad."); + failed.add(nodes[badNodeIndex]); + + DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; + arraycopy(nodes, newnodes, badNodeIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, badNodeIndex); + + final String[] newStorageIDs = new String[newnodes.length]; + arraycopy(storageIDs, newStorageIDs, badNodeIndex); + + setPipeline(newnodes, newStorageTypes, newStorageIDs); + + errorState.adjustState4RestartingNode(); + lastException.clear(); + } + return true; + } + + /** Add a datanode if replace-datanode policy is satisfied. */ + private void handleDatanodeReplacement() throws IOException { + if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(), + nodes, isAppend, isHflushed)) { + try { + addDatanode2ExistingPipeline(); + } catch(IOException ioe) { + if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { + throw ioe; + } + LOG.warn("Failed to replace datanode." + + " Continue with the remaining datanodes since " + + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY + + " is set to true.", ioe); + } + } + } + + private void failPacket4Testing() { + if (failPacket) { // for testing + failPacket = false; + try { + // Give DNs time to send in bad reports. In real situations, + // good reports should follow bad ones, if client committed + // with those nodes. + Thread.sleep(2000); + } catch (InterruptedException ie) {} + } + } + + LocatedBlock updateBlockForPipeline() throws IOException { + return dfsClient.namenode.updateBlockForPipeline( + block, dfsClient.clientName); + } + + /** update pipeline at the namenode */ + ExtendedBlock updatePipeline(long newGS) throws IOException { + final ExtendedBlock newBlock = new ExtendedBlock( + block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); + dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + nodes, storageIDs); + return newBlock; + } + /** * Open a DataStreamer to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. @@ -1354,9 +1425,8 @@ class DataStreamer extends Daemon { boolean success = false; ExtendedBlock oldBlock = block; do { - hasError = false; + errorState.reset(); lastException.clear(); - errorIndex = -1; success = false; DatanodeInfo[] excluded = @@ -1382,8 +1452,9 @@ class DataStreamer extends Daemon { dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, dfsClient.clientName); block = null; - LOG.info("Excluding datanode " + nodes[errorIndex]); - excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); + final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; + LOG.info("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); } } while (!success && --count >= 0); @@ -1464,7 +1535,7 @@ class DataStreamer extends Daemon { // from the local datanode. Thus it is safe to treat this as a // regular node error. if (PipelineAck.isRestartOOBStatus(pipelineStatus) && - restartingNodeIndex.get() == -1) { + !errorState.isRestartingNode()) { checkRestart = true; throw new IOException("A datanode is restarting."); } @@ -1475,10 +1546,9 @@ class DataStreamer extends Daemon { assert null == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success - restartingNodeIndex.set(-1); - hasError = false; + errorState.reset(); } catch (IOException ie) { - if (restartingNodeIndex.get() == -1) { + if (!errorState.isRestartingNode()) { LOG.info("Exception in createBlockOutputStream", ie); } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { @@ -1498,24 +1568,21 @@ class DataStreamer extends Daemon { for (int i = 0; i < nodes.length; i++) { // NB: Unconditionally using the xfer addr w/o hostname if (firstBadLink.equals(nodes[i].getXferAddr())) { - errorIndex = i; + errorState.setBadNodeIndex(i); break; } } } else { assert checkRestart == false; - errorIndex = 0; + errorState.setBadNodeIndex(0); } + + final int i = errorState.getBadNodeIndex(); // Check whether there is a restart worth waiting for. - if (checkRestart && shouldWaitForRestart(errorIndex)) { - restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout() - + Time.monotonicNow(); - restartingNodeIndex.set(errorIndex); - errorIndex = -1; - LOG.info("Waiting for the datanode to be restarted: " + - nodes[restartingNodeIndex.get()]); + if (checkRestart && shouldWaitForRestart(i)) { + errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]); } - hasError = true; + errorState.setError(true); lastException.set(ie); result = false; // error } finally { @@ -1699,10 +1766,10 @@ class DataStreamer extends Daemon { return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false); } - private LoadingCache initExcludedNodes() { - return CacheBuilder.newBuilder().expireAfterWrite( - dfsClient.getConf().getExcludedNodesCacheExpiry(), - TimeUnit.MILLISECONDS) + private static LoadingCache initExcludedNodes( + long excludedNodesCacheExpiry) { + return CacheBuilder.newBuilder() + .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) .removalListener(new RemovalListener() { @Override public void onRemoval(