HDFS-8397. Refactor the error handling code in DataStreamer. Contributed by Tsz Wo Nicholas Sze.
(cherry picked from commit 8f37873342
)
This commit is contained in:
parent
ce64720516
commit
d6aa65d037
|
@ -221,6 +221,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb)
|
||||
|
||||
HDFS-8397. Refactor the error handling code in DataStreamer.
|
||||
(Tsz Wo Nicholas Sze via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -38,7 +38,6 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -46,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
|
@ -208,6 +208,133 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
static class ErrorState {
|
||||
private boolean error = false;
|
||||
private int badNodeIndex = -1;
|
||||
private int restartingNodeIndex = -1;
|
||||
private long restartingNodeDeadline = 0;
|
||||
private final long datanodeRestartTimeout;
|
||||
|
||||
ErrorState(long datanodeRestartTimeout) {
|
||||
this.datanodeRestartTimeout = datanodeRestartTimeout;
|
||||
}
|
||||
|
||||
synchronized void reset() {
|
||||
error = false;
|
||||
badNodeIndex = -1;
|
||||
restartingNodeIndex = -1;
|
||||
restartingNodeDeadline = 0;
|
||||
}
|
||||
|
||||
synchronized boolean hasError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
synchronized boolean hasDatanodeError() {
|
||||
return error && isNodeMarked();
|
||||
}
|
||||
|
||||
synchronized void setError(boolean err) {
|
||||
this.error = err;
|
||||
}
|
||||
|
||||
synchronized void setBadNodeIndex(int index) {
|
||||
this.badNodeIndex = index;
|
||||
}
|
||||
|
||||
synchronized int getBadNodeIndex() {
|
||||
return badNodeIndex;
|
||||
}
|
||||
|
||||
synchronized int getRestartingNodeIndex() {
|
||||
return restartingNodeIndex;
|
||||
}
|
||||
|
||||
synchronized void initRestartingNode(int i, String message) {
|
||||
restartingNodeIndex = i;
|
||||
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
|
||||
// If the data streamer has already set the primary node
|
||||
// bad, clear it. It is likely that the write failed due to
|
||||
// the DN shutdown. Even if it was a real failure, the pipeline
|
||||
// recovery will take care of it.
|
||||
badNodeIndex = -1;
|
||||
LOG.info(message);
|
||||
}
|
||||
|
||||
synchronized boolean isRestartingNode() {
|
||||
return restartingNodeIndex >= 0;
|
||||
}
|
||||
|
||||
synchronized boolean isNodeMarked() {
|
||||
return badNodeIndex >= 0 || isRestartingNode();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used when no explicit error report was received, but
|
||||
* something failed. The first node is a suspect or unsure about the cause
|
||||
* so that it is marked as failed.
|
||||
*/
|
||||
synchronized void markFirstNodeIfNotMarked() {
|
||||
// There should be no existing error and no ongoing restart.
|
||||
if (!isNodeMarked()) {
|
||||
badNodeIndex = 0;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void adjustState4RestartingNode() {
|
||||
// Just took care of a node error while waiting for a node restart
|
||||
if (restartingNodeIndex >= 0) {
|
||||
// If the error came from a node further away than the restarting
|
||||
// node, the restart must have been complete.
|
||||
if (badNodeIndex > restartingNodeIndex) {
|
||||
restartingNodeIndex = -1;
|
||||
} else if (badNodeIndex < restartingNodeIndex) {
|
||||
// the node index has shifted.
|
||||
restartingNodeIndex--;
|
||||
} else {
|
||||
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
|
||||
+ " = restartingNodeIndex = " + restartingNodeIndex);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isRestartingNode()) {
|
||||
error = false;
|
||||
}
|
||||
badNodeIndex = -1;
|
||||
}
|
||||
|
||||
synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
|
||||
if (restartingNodeIndex >= 0) {
|
||||
if (!error) {
|
||||
throw new IllegalStateException("error=false while checking" +
|
||||
" restarting node deadline");
|
||||
}
|
||||
|
||||
// check badNodeIndex
|
||||
if (badNodeIndex == restartingNodeIndex) {
|
||||
// ignore, if came from the restarting node
|
||||
badNodeIndex = -1;
|
||||
}
|
||||
// not within the deadline
|
||||
if (Time.monotonicNow() >= restartingNodeDeadline) {
|
||||
// expired. declare the restarting node dead
|
||||
restartingNodeDeadline = 0;
|
||||
final int i = restartingNodeIndex;
|
||||
restartingNodeIndex = -1;
|
||||
LOG.warn("Datanode " + i + " did not restart within "
|
||||
+ datanodeRestartTimeout + "ms: " + nodes[i]);
|
||||
// Mark the restarting node as failed. If there is any other failed
|
||||
// node during the last pipeline construction attempt, it will not be
|
||||
// overwritten/dropped. In this case, the restarting node will get
|
||||
// excluded in the following attempt, if it still does not come up.
|
||||
if (badNodeIndex == -1) {
|
||||
badNodeIndex = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private volatile boolean streamerClosed = false;
|
||||
private ExtendedBlock block; // its length is number of bytes acked
|
||||
private Token<BlockTokenIdentifier> accessToken;
|
||||
|
@ -217,11 +344,8 @@ class DataStreamer extends Daemon {
|
|||
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
||||
private volatile StorageType[] storageTypes = null;
|
||||
private volatile String[] storageIDs = null;
|
||||
volatile boolean hasError = false;
|
||||
volatile int errorIndex = -1;
|
||||
// Restarting node index
|
||||
AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
|
||||
private long restartDeadline = 0; // Deadline of DN restart
|
||||
private final ErrorState errorState;
|
||||
|
||||
private BlockConstructionStage stage; // block construction stage
|
||||
private long bytesSent = 0; // number of bytes that've been sent
|
||||
private final boolean isLazyPersistFile;
|
||||
|
@ -287,11 +411,13 @@ class DataStreamer extends Daemon {
|
|||
this.cachingStrategy = cachingStrategy;
|
||||
this.byteArrayManager = byteArrayManage;
|
||||
this.isLazyPersistFile = isLazyPersist(stat);
|
||||
this.dfsclientSlowLogThresholdMs =
|
||||
dfsClient.getConf().getSlowIoWarningThresholdMs();
|
||||
this.excludedNodes = initExcludedNodes();
|
||||
this.isAppend = isAppend;
|
||||
this.favoredNodes = favoredNodes;
|
||||
|
||||
final DfsClientConf conf = dfsClient.getConf();
|
||||
this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
|
||||
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
|
||||
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -334,7 +460,6 @@ class DataStreamer extends Daemon {
|
|||
void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
|
||||
// setup pipeline to append to the last block XXX retries??
|
||||
setPipeline(lastBlock);
|
||||
errorIndex = -1; // no errors yet.
|
||||
if (nodes.length < 1) {
|
||||
throw new IOException("Unable to retrieve blocks locations " +
|
||||
" for last block " + block +
|
||||
|
@ -375,6 +500,10 @@ class DataStreamer extends Daemon {
|
|||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
}
|
||||
|
||||
private boolean shouldStop() {
|
||||
return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
|
||||
}
|
||||
|
||||
/*
|
||||
* streamer thread is the only thread that opens streams to datanode,
|
||||
* and closes them. Any error recovery is also done by this thread.
|
||||
|
@ -385,7 +514,7 @@ class DataStreamer extends Daemon {
|
|||
TraceScope scope = NullScope.INSTANCE;
|
||||
while (!streamerClosed && dfsClient.clientRunning) {
|
||||
// if the Responder encountered an error, shutdown Responder
|
||||
if (hasError && response != null) {
|
||||
if (errorState.hasError() && response != null) {
|
||||
try {
|
||||
response.close();
|
||||
response.join();
|
||||
|
@ -398,17 +527,13 @@ class DataStreamer extends Daemon {
|
|||
DFSPacket one;
|
||||
try {
|
||||
// process datanode IO errors if any
|
||||
boolean doSleep = false;
|
||||
if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
|
||||
doSleep = processDatanodeError();
|
||||
}
|
||||
boolean doSleep = processDatanodeError();
|
||||
|
||||
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
|
||||
synchronized (dataQueue) {
|
||||
// wait for a packet to be sent.
|
||||
long now = Time.monotonicNow();
|
||||
while ((!streamerClosed && !hasError && dfsClient.clientRunning
|
||||
&& dataQueue.size() == 0 &&
|
||||
while ((!shouldStop() && dataQueue.size() == 0 &&
|
||||
(stage != BlockConstructionStage.DATA_STREAMING ||
|
||||
stage == BlockConstructionStage.DATA_STREAMING &&
|
||||
now - lastPacket < halfSocketTimeout)) || doSleep ) {
|
||||
|
@ -424,13 +549,12 @@ class DataStreamer extends Daemon {
|
|||
doSleep = false;
|
||||
now = Time.monotonicNow();
|
||||
}
|
||||
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
||||
if (shouldStop()) {
|
||||
continue;
|
||||
}
|
||||
// get packet to be sent.
|
||||
if (dataQueue.isEmpty()) {
|
||||
one = createHeartbeatPacket();
|
||||
assert one != null;
|
||||
} else {
|
||||
try {
|
||||
backOffIfNecessary();
|
||||
|
@ -460,7 +584,7 @@ class DataStreamer extends Daemon {
|
|||
LOG.debug("Append to block " + block);
|
||||
}
|
||||
setupPipelineForAppendOrRecovery();
|
||||
if (true == streamerClosed) {
|
||||
if (streamerClosed) {
|
||||
continue;
|
||||
}
|
||||
initDataStreaming();
|
||||
|
@ -478,8 +602,7 @@ class DataStreamer extends Daemon {
|
|||
if (one.isLastPacketInBlock()) {
|
||||
// wait for all data packets have been successfully acked
|
||||
synchronized (dataQueue) {
|
||||
while (!streamerClosed && !hasError &&
|
||||
ackQueue.size() != 0 && dfsClient.clientRunning) {
|
||||
while (!shouldStop() && ackQueue.size() != 0) {
|
||||
try {
|
||||
// wait for acks to arrive from datanodes
|
||||
dataQueue.wait(1000);
|
||||
|
@ -488,7 +611,7 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
||||
if (shouldStop()) {
|
||||
continue;
|
||||
}
|
||||
stage = BlockConstructionStage.PIPELINE_CLOSE;
|
||||
|
@ -524,7 +647,7 @@ class DataStreamer extends Daemon {
|
|||
// effect. Pipeline recovery can handle only one node error at a
|
||||
// time. If the primary node fails again during the recovery, it
|
||||
// will be taken out then.
|
||||
tryMarkPrimaryDatanodeFailed();
|
||||
errorState.markFirstNodeIfNotMarked();
|
||||
throw e;
|
||||
} finally {
|
||||
writeScope.close();
|
||||
|
@ -537,7 +660,7 @@ class DataStreamer extends Daemon {
|
|||
bytesSent = tmpBytesSent;
|
||||
}
|
||||
|
||||
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
||||
if (shouldStop()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -545,12 +668,11 @@ class DataStreamer extends Daemon {
|
|||
if (one.isLastPacketInBlock()) {
|
||||
// wait for the close packet has been acked
|
||||
synchronized (dataQueue) {
|
||||
while (!streamerClosed && !hasError &&
|
||||
ackQueue.size() != 0 && dfsClient.clientRunning) {
|
||||
while (!shouldStop() && ackQueue.size() != 0) {
|
||||
dataQueue.wait(1000);// wait for acks to arrive from datanodes
|
||||
}
|
||||
}
|
||||
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
||||
if (shouldStop()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -564,7 +686,7 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
} catch (Throwable e) {
|
||||
// Log warning if there was a real error.
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
if (!errorState.isRestartingNode()) {
|
||||
// Since their messages are descriptive enough, do not always
|
||||
// log a verbose stack-trace WARN for quota exceptions.
|
||||
if (e instanceof QuotaExceededException) {
|
||||
|
@ -575,8 +697,8 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
lastException.set(e);
|
||||
assert !(e instanceof NullPointerException);
|
||||
hasError = true;
|
||||
if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
|
||||
errorState.setError(true);
|
||||
if (!errorState.isNodeMarked()) {
|
||||
// Not a datanode issue
|
||||
streamerClosed = true;
|
||||
}
|
||||
|
@ -773,40 +895,6 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
// The following synchronized methods are used whenever
|
||||
// errorIndex or restartingNodeIndex is set. This is because
|
||||
// check & set needs to be atomic. Simply reading variables
|
||||
// does not require a synchronization. When responder is
|
||||
// not running (e.g. during pipeline recovery), there is no
|
||||
// need to use these methods.
|
||||
|
||||
/** Set the error node index. Called by responder */
|
||||
synchronized void setErrorIndex(int idx) {
|
||||
errorIndex = idx;
|
||||
}
|
||||
|
||||
/** Set the restarting node index. Called by responder */
|
||||
synchronized void setRestartingNodeIndex(int idx) {
|
||||
restartingNodeIndex.set(idx);
|
||||
// If the data streamer has already set the primary node
|
||||
// bad, clear it. It is likely that the write failed due to
|
||||
// the DN shutdown. Even if it was a real failure, the pipeline
|
||||
// recovery will take care of it.
|
||||
errorIndex = -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used when no explicit error report was received,
|
||||
* but something failed. When the primary node is a suspect or
|
||||
* unsure about the cause, the primary node is marked as failed.
|
||||
*/
|
||||
synchronized void tryMarkPrimaryDatanodeFailed() {
|
||||
// There should be no existing error and no ongoing restart.
|
||||
if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
|
||||
errorIndex = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Examine whether it is worth waiting for a node to restart.
|
||||
* @param index the node index
|
||||
|
@ -883,20 +971,16 @@ class DataStreamer extends Daemon {
|
|||
// the local node or the only one in the pipeline.
|
||||
if (PipelineAck.isRestartOOBStatus(reply) &&
|
||||
shouldWaitForRestart(i)) {
|
||||
restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
|
||||
+ Time.monotonicNow();
|
||||
setRestartingNodeIndex(i);
|
||||
String message = "A datanode is restarting: " + targets[i];
|
||||
LOG.info(message);
|
||||
final String message = "Datanode " + i + " is restarting: "
|
||||
+ targets[i];
|
||||
errorState.initRestartingNode(i, message);
|
||||
throw new IOException(message);
|
||||
}
|
||||
// node error
|
||||
if (reply != SUCCESS) {
|
||||
setErrorIndex(i); // first bad datanode
|
||||
errorState.setBadNodeIndex(i); // mark bad datanode
|
||||
throw new IOException("Bad response " + reply +
|
||||
" for block " + block +
|
||||
" from datanode " +
|
||||
targets[i]);
|
||||
" for " + block + " from datanode " + targets[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -954,14 +1038,12 @@ class DataStreamer extends Daemon {
|
|||
} catch (Exception e) {
|
||||
if (!responderClosed) {
|
||||
lastException.set(e);
|
||||
hasError = true;
|
||||
// If no explicit error report was received, mark the primary
|
||||
// node as failed.
|
||||
tryMarkPrimaryDatanodeFailed();
|
||||
errorState.setError(true);
|
||||
errorState.markFirstNodeIfNotMarked();
|
||||
synchronized (dataQueue) {
|
||||
dataQueue.notifyAll();
|
||||
}
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
if (!errorState.isRestartingNode()) {
|
||||
LOG.warn("Exception for " + block, e);
|
||||
}
|
||||
responderClosed = true;
|
||||
|
@ -978,11 +1060,16 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
// If this stream has encountered any errors so far, shutdown
|
||||
// threads and mark stream as closed. Returns true if we should
|
||||
// sleep for a while after returning from this call.
|
||||
//
|
||||
/**
|
||||
* If this stream has encountered any errors, shutdown threads
|
||||
* and mark the stream as closed.
|
||||
*
|
||||
* @return true if it should sleep for a while after returning.
|
||||
*/
|
||||
private boolean processDatanodeError() throws IOException {
|
||||
if (!errorState.hasDatanodeError()) {
|
||||
return false;
|
||||
}
|
||||
if (response != null) {
|
||||
LOG.info("Error Recovery for " + block +
|
||||
" waiting for responder to exit. ");
|
||||
|
@ -1064,7 +1151,7 @@ class DataStreamer extends Daemon {
|
|||
.append("The current failed datanode replacement policy is ")
|
||||
.append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
|
||||
.append("a client may configure this via '")
|
||||
.append(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
|
||||
.append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
|
||||
.append("' in its configuration.")
|
||||
.toString());
|
||||
}
|
||||
|
@ -1190,156 +1277,140 @@ class DataStreamer extends Daemon {
|
|||
boolean success = false;
|
||||
long newGS = 0L;
|
||||
while (!success && !streamerClosed && dfsClient.clientRunning) {
|
||||
// Sleep before reconnect if a dn is restarting.
|
||||
// This process will be repeated until the deadline or the datanode
|
||||
// starts back up.
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
// 4 seconds or the configured deadline period, whichever is shorter.
|
||||
// This is the retry interval and recovery will be retried in this
|
||||
// interval until timeout or success.
|
||||
long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
|
||||
4000L);
|
||||
try {
|
||||
Thread.sleep(delay);
|
||||
} catch (InterruptedException ie) {
|
||||
lastException.set(new IOException("Interrupted while waiting for " +
|
||||
"datanode to restart. " + nodes[restartingNodeIndex.get()]));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
boolean isRecovery = hasError;
|
||||
// remove bad datanode from list of datanodes.
|
||||
// If errorIndex was not set (i.e. appends), then do not remove
|
||||
// any datanodes
|
||||
//
|
||||
if (errorIndex >= 0) {
|
||||
StringBuilder pipelineMsg = new StringBuilder();
|
||||
for (int j = 0; j < nodes.length; j++) {
|
||||
pipelineMsg.append(nodes[j]);
|
||||
if (j < nodes.length - 1) {
|
||||
pipelineMsg.append(", ");
|
||||
}
|
||||
}
|
||||
if (nodes.length <= 1) {
|
||||
lastException.set(new IOException("All datanodes " + pipelineMsg
|
||||
+ " are bad. Aborting..."));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
LOG.warn("Error Recovery for block " + block +
|
||||
" in pipeline " + pipelineMsg +
|
||||
": bad datanode " + nodes[errorIndex]);
|
||||
failed.add(nodes[errorIndex]);
|
||||
|
||||
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
||||
arraycopy(nodes, newnodes, errorIndex);
|
||||
|
||||
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
|
||||
arraycopy(storageTypes, newStorageTypes, errorIndex);
|
||||
|
||||
final String[] newStorageIDs = new String[newnodes.length];
|
||||
arraycopy(storageIDs, newStorageIDs, errorIndex);
|
||||
|
||||
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
||||
|
||||
// Just took care of a node error while waiting for a node restart
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
// If the error came from a node further away than the restarting
|
||||
// node, the restart must have been complete.
|
||||
if (errorIndex > restartingNodeIndex.get()) {
|
||||
restartingNodeIndex.set(-1);
|
||||
} else if (errorIndex < restartingNodeIndex.get()) {
|
||||
// the node index has shifted.
|
||||
restartingNodeIndex.decrementAndGet();
|
||||
} else {
|
||||
// this shouldn't happen...
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
hasError = false;
|
||||
}
|
||||
lastException.clear();
|
||||
errorIndex = -1;
|
||||
if (!handleRestartingDatanode()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if replace-datanode policy is satisfied.
|
||||
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
|
||||
nodes, isAppend, isHflushed)) {
|
||||
try {
|
||||
addDatanode2ExistingPipeline();
|
||||
} catch(IOException ioe) {
|
||||
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
|
||||
throw ioe;
|
||||
}
|
||||
LOG.warn("Failed to replace datanode."
|
||||
+ " Continue with the remaining datanodes since "
|
||||
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
|
||||
+ " is set to true.", ioe);
|
||||
}
|
||||
final boolean isRecovery = errorState.hasError();
|
||||
if (!handleBadDatanode()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
handleDatanodeReplacement();
|
||||
|
||||
// get a new generation stamp and an access token
|
||||
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
|
||||
final LocatedBlock lb = updateBlockForPipeline();
|
||||
newGS = lb.getBlock().getGenerationStamp();
|
||||
accessToken = lb.getBlockToken();
|
||||
|
||||
// set up the pipeline again with the remaining nodes
|
||||
if (failPacket) { // for testing
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
failPacket = false;
|
||||
try {
|
||||
// Give DNs time to send in bad reports. In real situations,
|
||||
// good reports should follow bad ones, if client committed
|
||||
// with those nodes.
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ie) {}
|
||||
} else {
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
}
|
||||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
assert hasError == true;
|
||||
// check errorIndex set above
|
||||
if (errorIndex == restartingNodeIndex.get()) {
|
||||
// ignore, if came from the restarting node
|
||||
errorIndex = -1;
|
||||
}
|
||||
// still within the deadline
|
||||
if (Time.monotonicNow() < restartDeadline) {
|
||||
continue; // with in the deadline
|
||||
}
|
||||
// expired. declare the restarting node dead
|
||||
restartDeadline = 0;
|
||||
int expiredNodeIndex = restartingNodeIndex.get();
|
||||
restartingNodeIndex.set(-1);
|
||||
LOG.warn("Datanode did not restart in time: " +
|
||||
nodes[expiredNodeIndex]);
|
||||
// Mark the restarting node as failed. If there is any other failed
|
||||
// node during the last pipeline construction attempt, it will not be
|
||||
// overwritten/dropped. In this case, the restarting node will get
|
||||
// excluded in the following attempt, if it still does not come up.
|
||||
if (errorIndex == -1) {
|
||||
errorIndex = expiredNodeIndex;
|
||||
}
|
||||
// From this point on, normal pipeline recovery applies.
|
||||
}
|
||||
failPacket4Testing();
|
||||
|
||||
errorState.checkRestartingNodeDeadline(nodes);
|
||||
} // while
|
||||
|
||||
if (success) {
|
||||
// update pipeline at the namenode
|
||||
ExtendedBlock newBlock = new ExtendedBlock(
|
||||
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
|
||||
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
|
||||
nodes, storageIDs);
|
||||
// update client side generation stamp
|
||||
block = newBlock;
|
||||
block = updatePipeline(newGS);
|
||||
}
|
||||
return false; // do not sleep, continue processing
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep if a node is restarting.
|
||||
* This process is repeated until the deadline or the node starts back up.
|
||||
* @return true if it should continue.
|
||||
*/
|
||||
private boolean handleRestartingDatanode() {
|
||||
if (errorState.isRestartingNode()) {
|
||||
// 4 seconds or the configured deadline period, whichever is shorter.
|
||||
// This is the retry interval and recovery will be retried in this
|
||||
// interval until timeout or success.
|
||||
final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
|
||||
try {
|
||||
Thread.sleep(delay);
|
||||
} catch (InterruptedException ie) {
|
||||
lastException.set(new IOException(
|
||||
"Interrupted while waiting for restarting "
|
||||
+ nodes[errorState.getRestartingNodeIndex()]));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove bad node from list of nodes if badNodeIndex was set.
|
||||
* @return true if it should continue.
|
||||
*/
|
||||
private boolean handleBadDatanode() {
|
||||
final int badNodeIndex = errorState.getBadNodeIndex();
|
||||
if (badNodeIndex >= 0) {
|
||||
if (nodes.length <= 1) {
|
||||
lastException.set(new IOException("All datanodes "
|
||||
+ Arrays.toString(nodes) + " are bad. Aborting..."));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.warn("Error Recovery for " + block + " in pipeline "
|
||||
+ Arrays.toString(nodes) + ": datanode " + badNodeIndex
|
||||
+ "("+ nodes[badNodeIndex] + ") is bad.");
|
||||
failed.add(nodes[badNodeIndex]);
|
||||
|
||||
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
||||
arraycopy(nodes, newnodes, badNodeIndex);
|
||||
|
||||
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
|
||||
arraycopy(storageTypes, newStorageTypes, badNodeIndex);
|
||||
|
||||
final String[] newStorageIDs = new String[newnodes.length];
|
||||
arraycopy(storageIDs, newStorageIDs, badNodeIndex);
|
||||
|
||||
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
||||
|
||||
errorState.adjustState4RestartingNode();
|
||||
lastException.clear();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Add a datanode if replace-datanode policy is satisfied. */
|
||||
private void handleDatanodeReplacement() throws IOException {
|
||||
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
|
||||
nodes, isAppend, isHflushed)) {
|
||||
try {
|
||||
addDatanode2ExistingPipeline();
|
||||
} catch(IOException ioe) {
|
||||
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
|
||||
throw ioe;
|
||||
}
|
||||
LOG.warn("Failed to replace datanode."
|
||||
+ " Continue with the remaining datanodes since "
|
||||
+ BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
|
||||
+ " is set to true.", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void failPacket4Testing() {
|
||||
if (failPacket) { // for testing
|
||||
failPacket = false;
|
||||
try {
|
||||
// Give DNs time to send in bad reports. In real situations,
|
||||
// good reports should follow bad ones, if client committed
|
||||
// with those nodes.
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
|
||||
LocatedBlock updateBlockForPipeline() throws IOException {
|
||||
return dfsClient.namenode.updateBlockForPipeline(
|
||||
block, dfsClient.clientName);
|
||||
}
|
||||
|
||||
/** update pipeline at the namenode */
|
||||
ExtendedBlock updatePipeline(long newGS) throws IOException {
|
||||
final ExtendedBlock newBlock = new ExtendedBlock(
|
||||
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
|
||||
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
|
||||
nodes, storageIDs);
|
||||
return newBlock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a DataStreamer to a DataNode so that it can be written to.
|
||||
* This happens when a file is created and each time a new block is allocated.
|
||||
|
@ -1354,9 +1425,8 @@ class DataStreamer extends Daemon {
|
|||
boolean success = false;
|
||||
ExtendedBlock oldBlock = block;
|
||||
do {
|
||||
hasError = false;
|
||||
errorState.reset();
|
||||
lastException.clear();
|
||||
errorIndex = -1;
|
||||
success = false;
|
||||
|
||||
DatanodeInfo[] excluded =
|
||||
|
@ -1382,8 +1452,9 @@ class DataStreamer extends Daemon {
|
|||
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
|
||||
dfsClient.clientName);
|
||||
block = null;
|
||||
LOG.info("Excluding datanode " + nodes[errorIndex]);
|
||||
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
|
||||
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
|
||||
LOG.info("Excluding datanode " + badNode);
|
||||
excludedNodes.put(badNode, badNode);
|
||||
}
|
||||
} while (!success && --count >= 0);
|
||||
|
||||
|
@ -1464,7 +1535,7 @@ class DataStreamer extends Daemon {
|
|||
// from the local datanode. Thus it is safe to treat this as a
|
||||
// regular node error.
|
||||
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
|
||||
restartingNodeIndex.get() == -1) {
|
||||
!errorState.isRestartingNode()) {
|
||||
checkRestart = true;
|
||||
throw new IOException("A datanode is restarting.");
|
||||
}
|
||||
|
@ -1475,10 +1546,9 @@ class DataStreamer extends Daemon {
|
|||
assert null == blockStream : "Previous blockStream unclosed";
|
||||
blockStream = out;
|
||||
result = true; // success
|
||||
restartingNodeIndex.set(-1);
|
||||
hasError = false;
|
||||
errorState.reset();
|
||||
} catch (IOException ie) {
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
if (!errorState.isRestartingNode()) {
|
||||
LOG.info("Exception in createBlockOutputStream", ie);
|
||||
}
|
||||
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
||||
|
@ -1498,24 +1568,21 @@ class DataStreamer extends Daemon {
|
|||
for (int i = 0; i < nodes.length; i++) {
|
||||
// NB: Unconditionally using the xfer addr w/o hostname
|
||||
if (firstBadLink.equals(nodes[i].getXferAddr())) {
|
||||
errorIndex = i;
|
||||
errorState.setBadNodeIndex(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert checkRestart == false;
|
||||
errorIndex = 0;
|
||||
errorState.setBadNodeIndex(0);
|
||||
}
|
||||
|
||||
final int i = errorState.getBadNodeIndex();
|
||||
// Check whether there is a restart worth waiting for.
|
||||
if (checkRestart && shouldWaitForRestart(errorIndex)) {
|
||||
restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
|
||||
+ Time.monotonicNow();
|
||||
restartingNodeIndex.set(errorIndex);
|
||||
errorIndex = -1;
|
||||
LOG.info("Waiting for the datanode to be restarted: " +
|
||||
nodes[restartingNodeIndex.get()]);
|
||||
if (checkRestart && shouldWaitForRestart(i)) {
|
||||
errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
|
||||
}
|
||||
hasError = true;
|
||||
errorState.setError(true);
|
||||
lastException.set(ie);
|
||||
result = false; // error
|
||||
} finally {
|
||||
|
@ -1699,10 +1766,10 @@ class DataStreamer extends Daemon {
|
|||
return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
|
||||
}
|
||||
|
||||
private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
|
||||
return CacheBuilder.newBuilder().expireAfterWrite(
|
||||
dfsClient.getConf().getExcludedNodesCacheExpiry(),
|
||||
TimeUnit.MILLISECONDS)
|
||||
private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
|
||||
long excludedNodesCacheExpiry) {
|
||||
return CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
|
||||
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
|
|
Loading…
Reference in New Issue