HDFS-8397. Refactor the error handling code in DataStreamer. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2015-05-15 16:14:54 -07:00
parent f7e051c431
commit 8f37873342
2 changed files with 309 additions and 239 deletions

View File

@ -554,6 +554,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb)
HDFS-8397. Refactor the error handling code in DataStreamer.
(Tsz Wo Nicholas Sze via jing9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -38,7 +38,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@ -46,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -208,6 +208,133 @@ class DataStreamer extends Daemon {
}
}
static class ErrorState {
private boolean error = false;
private int badNodeIndex = -1;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
private final long datanodeRestartTimeout;
ErrorState(long datanodeRestartTimeout) {
this.datanodeRestartTimeout = datanodeRestartTimeout;
}
synchronized void reset() {
error = false;
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
}
synchronized boolean hasError() {
return error;
}
synchronized boolean hasDatanodeError() {
return error && isNodeMarked();
}
synchronized void setError(boolean err) {
this.error = err;
}
synchronized void setBadNodeIndex(int index) {
this.badNodeIndex = index;
}
synchronized int getBadNodeIndex() {
return badNodeIndex;
}
synchronized int getRestartingNodeIndex() {
return restartingNodeIndex;
}
synchronized void initRestartingNode(int i, String message) {
restartingNodeIndex = i;
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
LOG.info(message);
}
synchronized boolean isRestartingNode() {
return restartingNodeIndex >= 0;
}
synchronized boolean isNodeMarked() {
return badNodeIndex >= 0 || isRestartingNode();
}
/**
* This method is used when no explicit error report was received, but
* something failed. The first node is a suspect or unsure about the cause
* so that it is marked as failed.
*/
synchronized void markFirstNodeIfNotMarked() {
// There should be no existing error and no ongoing restart.
if (!isNodeMarked()) {
badNodeIndex = 0;
}
}
synchronized void adjustState4RestartingNode() {
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (badNodeIndex > restartingNodeIndex) {
restartingNodeIndex = -1;
} else if (badNodeIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ " = restartingNodeIndex = " + restartingNodeIndex);
}
}
if (!isRestartingNode()) {
error = false;
}
badNodeIndex = -1;
}
synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
if (restartingNodeIndex >= 0) {
if (!error) {
throw new IllegalStateException("error=false while checking" +
" restarting node deadline");
}
// check badNodeIndex
if (badNodeIndex == restartingNodeIndex) {
// ignore, if came from the restarting node
badNodeIndex = -1;
}
// not within the deadline
if (Time.monotonicNow() >= restartingNodeDeadline) {
// expired. declare the restarting node dead
restartingNodeDeadline = 0;
final int i = restartingNodeIndex;
restartingNodeIndex = -1;
LOG.warn("Datanode " + i + " did not restart within "
+ datanodeRestartTimeout + "ms: " + nodes[i]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (badNodeIndex == -1) {
badNodeIndex = i;
}
}
}
}
}
private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
@ -217,11 +344,8 @@ class DataStreamer extends Daemon {
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
volatile boolean hasError = false;
volatile int errorIndex = -1;
// Restarting node index
AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
private long restartDeadline = 0; // Deadline of DN restart
private final ErrorState errorState;
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
@ -287,11 +411,13 @@ class DataStreamer extends Daemon {
this.cachingStrategy = cachingStrategy;
this.byteArrayManager = byteArrayManage;
this.isLazyPersistFile = isLazyPersist(stat);
this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().getSlowIoWarningThresholdMs();
this.excludedNodes = initExcludedNodes();
this.isAppend = isAppend;
this.favoredNodes = favoredNodes;
final DfsClientConf conf = dfsClient.getConf();
this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
}
/**
@ -334,7 +460,6 @@ class DataStreamer extends Daemon {
void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
// setup pipeline to append to the last block XXX retries??
setPipeline(lastBlock);
errorIndex = -1; // no errors yet.
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block +
@ -375,6 +500,10 @@ class DataStreamer extends Daemon {
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
private boolean shouldStop() {
return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
}
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
@ -385,7 +514,7 @@ class DataStreamer extends Daemon {
TraceScope scope = NullScope.INSTANCE;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
if (errorState.hasError() && response != null) {
try {
response.close();
response.join();
@ -398,17 +527,13 @@ class DataStreamer extends Daemon {
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
doSleep = processDatanodeError();
}
boolean doSleep = processDatanodeError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < halfSocketTimeout)) || doSleep ) {
@ -424,13 +549,12 @@ class DataStreamer extends Daemon {
doSleep = false;
now = Time.monotonicNow();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
assert one != null;
} else {
try {
backOffIfNecessary();
@ -460,7 +584,7 @@ class DataStreamer extends Daemon {
LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery();
if (true == streamerClosed) {
if (streamerClosed) {
continue;
}
initDataStreaming();
@ -478,8 +602,7 @@ class DataStreamer extends Daemon {
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
while (!shouldStop() && ackQueue.size() != 0) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
@ -488,7 +611,7 @@ class DataStreamer extends Daemon {
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
if (shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
@ -524,7 +647,7 @@ class DataStreamer extends Daemon {
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
errorState.markFirstNodeIfNotMarked();
throw e;
} finally {
writeScope.close();
@ -537,7 +660,7 @@ class DataStreamer extends Daemon {
bytesSent = tmpBytesSent;
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
if (shouldStop()) {
continue;
}
@ -545,12 +668,11 @@ class DataStreamer extends Daemon {
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
if (shouldStop()) {
continue;
}
@ -564,7 +686,7 @@ class DataStreamer extends Daemon {
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (restartingNodeIndex.get() == -1) {
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
@ -575,8 +697,8 @@ class DataStreamer extends Daemon {
}
lastException.set(e);
assert !(e instanceof NullPointerException);
hasError = true;
if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
errorState.setError(true);
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
@ -773,40 +895,6 @@ class DataStreamer extends Daemon {
}
}
// The following synchronized methods are used whenever
// errorIndex or restartingNodeIndex is set. This is because
// check & set needs to be atomic. Simply reading variables
// does not require a synchronization. When responder is
// not running (e.g. during pipeline recovery), there is no
// need to use these methods.
/** Set the error node index. Called by responder */
synchronized void setErrorIndex(int idx) {
errorIndex = idx;
}
/** Set the restarting node index. Called by responder */
synchronized void setRestartingNodeIndex(int idx) {
restartingNodeIndex.set(idx);
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
errorIndex = -1;
}
/**
* This method is used when no explicit error report was received,
* but something failed. When the primary node is a suspect or
* unsure about the cause, the primary node is marked as failed.
*/
synchronized void tryMarkPrimaryDatanodeFailed() {
// There should be no existing error and no ongoing restart.
if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
errorIndex = 0;
}
}
/**
* Examine whether it is worth waiting for a node to restart.
* @param index the node index
@ -883,20 +971,16 @@ class DataStreamer extends Daemon {
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
LOG.info(message);
final String message = "Datanode " + i + " is restarting: "
+ targets[i];
errorState.initRestartingNode(i, message);
throw new IOException(message);
}
// node error
if (reply != SUCCESS) {
setErrorIndex(i); // first bad datanode
errorState.setBadNodeIndex(i); // mark bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
targets[i]);
" for " + block + " from datanode " + targets[i]);
}
}
@ -954,14 +1038,12 @@ class DataStreamer extends Daemon {
} catch (Exception e) {
if (!responderClosed) {
lastException.set(e);
hasError = true;
// If no explicit error report was received, mark the primary
// node as failed.
tryMarkPrimaryDatanodeFailed();
errorState.setError(true);
errorState.markFirstNodeIfNotMarked();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (restartingNodeIndex.get() == -1) {
if (!errorState.isRestartingNode()) {
LOG.warn("Exception for " + block, e);
}
responderClosed = true;
@ -978,11 +1060,16 @@ class DataStreamer extends Daemon {
}
}
// If this stream has encountered any errors so far, shutdown
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
/**
* If this stream has encountered any errors, shutdown threads
* and mark the stream as closed.
*
* @return true if it should sleep for a while after returning.
*/
private boolean processDatanodeError() throws IOException {
if (!errorState.hasDatanodeError()) {
return false;
}
if (response != null) {
LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
@ -1064,7 +1151,7 @@ class DataStreamer extends Daemon {
.append("The current failed datanode replacement policy is ")
.append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
.append("a client may configure this via '")
.append(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
.append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
.append("' in its configuration.")
.toString());
}
@ -1190,156 +1277,140 @@ class DataStreamer extends Daemon {
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
// Sleep before reconnect if a dn is restarting.
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex.get() >= 0) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException("Interrupted while waiting for " +
"datanode to restart. " + nodes[restartingNodeIndex.get()]));
streamerClosed = true;
return false;
}
}
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
if (errorIndex >= 0) {
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
pipelineMsg.append(nodes[j]);
if (j < nodes.length - 1) {
pipelineMsg.append(", ");
}
}
if (nodes.length <= 1) {
lastException.set(new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting..."));
streamerClosed = true;
return false;
}
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex]);
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, errorIndex);
setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex.get() >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (errorIndex > restartingNodeIndex.get()) {
restartingNodeIndex.set(-1);
} else if (errorIndex < restartingNodeIndex.get()) {
// the node index has shifted.
restartingNodeIndex.decrementAndGet();
} else {
// this shouldn't happen...
assert false;
}
}
if (restartingNodeIndex.get() == -1) {
hasError = false;
}
lastException.clear();
errorIndex = -1;
if (!handleRestartingDatanode()) {
return false;
}
// Check if replace-datanode policy is satisfied.
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
nodes, isAppend, isHflushed)) {
try {
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
final boolean isRecovery = errorState.hasError();
if (!handleBadDatanode()) {
return false;
}
handleDatanodeReplacement();
// get a new generation stamp and an access token
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
final LocatedBlock lb = updateBlockForPipeline();
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
if (restartingNodeIndex.get() >= 0) {
assert hasError == true;
// check errorIndex set above
if (errorIndex == restartingNodeIndex.get()) {
// ignore, if came from the restarting node
errorIndex = -1;
}
// still within the deadline
if (Time.monotonicNow() < restartDeadline) {
continue; // with in the deadline
}
// expired. declare the restarting node dead
restartDeadline = 0;
int expiredNodeIndex = restartingNodeIndex.get();
restartingNodeIndex.set(-1);
LOG.warn("Datanode did not restart in time: " +
nodes[expiredNodeIndex]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (errorIndex == -1) {
errorIndex = expiredNodeIndex;
}
// From this point on, normal pipeline recovery applies.
}
failPacket4Testing();
errorState.checkRestartingNodeDeadline(nodes);
} // while
if (success) {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;
block = updatePipeline(newGS);
}
return false; // do not sleep, continue processing
}
/**
* Sleep if a node is restarting.
* This process is repeated until the deadline or the node starts back up.
* @return true if it should continue.
*/
private boolean handleRestartingDatanode() {
if (errorState.isRestartingNode()) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException(
"Interrupted while waiting for restarting "
+ nodes[errorState.getRestartingNodeIndex()]));
streamerClosed = true;
return false;
}
}
return true;
}
/**
* Remove bad node from list of nodes if badNodeIndex was set.
* @return true if it should continue.
*/
private boolean handleBadDatanode() {
final int badNodeIndex = errorState.getBadNodeIndex();
if (badNodeIndex >= 0) {
if (nodes.length <= 1) {
lastException.set(new IOException("All datanodes "
+ Arrays.toString(nodes) + " are bad. Aborting..."));
streamerClosed = true;
return false;
}
LOG.warn("Error Recovery for " + block + " in pipeline "
+ Arrays.toString(nodes) + ": datanode " + badNodeIndex
+ "("+ nodes[badNodeIndex] + ") is bad.");
failed.add(nodes[badNodeIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
arraycopy(nodes, newnodes, badNodeIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, badNodeIndex);
final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, badNodeIndex);
setPipeline(newnodes, newStorageTypes, newStorageIDs);
errorState.adjustState4RestartingNode();
lastException.clear();
}
return true;
}
/** Add a datanode if replace-datanode policy is satisfied. */
private void handleDatanodeReplacement() throws IOException {
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
nodes, isAppend, isHflushed)) {
try {
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
}
}
private void failPacket4Testing() {
if (failPacket) { // for testing
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
}
}
LocatedBlock updateBlockForPipeline() throws IOException {
return dfsClient.namenode.updateBlockForPipeline(
block, dfsClient.clientName);
}
/** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
return newBlock;
}
/**
* Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
@ -1354,9 +1425,8 @@ class DataStreamer extends Daemon {
boolean success = false;
ExtendedBlock oldBlock = block;
do {
hasError = false;
errorState.reset();
lastException.clear();
errorIndex = -1;
success = false;
DatanodeInfo[] excluded =
@ -1382,8 +1452,9 @@ class DataStreamer extends Daemon {
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
dfsClient.clientName);
block = null;
LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.info("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
}
} while (!success && --count >= 0);
@ -1464,7 +1535,7 @@ class DataStreamer extends Daemon {
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
restartingNodeIndex.get() == -1) {
!errorState.isRestartingNode()) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
@ -1475,10 +1546,9 @@ class DataStreamer extends Daemon {
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
restartingNodeIndex.set(-1);
hasError = false;
errorState.reset();
} catch (IOException ie) {
if (restartingNodeIndex.get() == -1) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream", ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
@ -1498,24 +1568,21 @@ class DataStreamer extends Daemon {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i;
errorState.setBadNodeIndex(i);
break;
}
}
} else {
assert checkRestart == false;
errorIndex = 0;
errorState.setBadNodeIndex(0);
}
final int i = errorState.getBadNodeIndex();
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow();
restartingNodeIndex.set(errorIndex);
errorIndex = -1;
LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex.get()]);
if (checkRestart && shouldWaitForRestart(i)) {
errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
}
hasError = true;
errorState.setError(true);
lastException.set(ie);
result = false; // error
} finally {
@ -1699,10 +1766,10 @@ class DataStreamer extends Daemon {
return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
}
private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
return CacheBuilder.newBuilder().expireAfterWrite(
dfsClient.getConf().getExcludedNodesCacheExpiry(),
TimeUnit.MILLISECONDS)
private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
long excludedNodesCacheExpiry) {
return CacheBuilder.newBuilder()
.expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
@Override
public void onRemoval(