HDFS-5522. Datanode disk error check may be incorrectly skipped. Contributed by Rushabh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-05-12 19:08:06 +00:00
parent 09002d9341
commit e9459baec5
4 changed files with 92 additions and 77 deletions

View File

@ -452,6 +452,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory. HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory.
(Yongjun Zhang via wang) (Yongjun Zhang via wang)
HDFS-5522. Datanode disk error check may be incorrectly skipped.
(Rushabh S Shah via kihwal)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -248,7 +248,7 @@ class BlockReceiver implements Closeable {
if (cause != null) { // possible disk error if (cause != null) { // possible disk error
ioe = cause; ioe = cause;
datanode.checkDiskError(ioe); // may throw an exception here datanode.checkDiskError();
} }
throw ioe; throw ioe;
@ -324,7 +324,7 @@ class BlockReceiver implements Closeable {
} }
// disk check // disk check
if(ioe != null) { if(ioe != null) {
datanode.checkDiskError(ioe); datanode.checkDiskError();
throw ioe; throw ioe;
} }
} }
@ -615,7 +615,7 @@ class BlockReceiver implements Closeable {
manageWriterOsCache(offsetInBlock); manageWriterOsCache(offsetInBlock);
} }
} catch (IOException iex) { } catch (IOException iex) {
datanode.checkDiskError(iex); datanode.checkDiskError();
throw iex; throw iex;
} }
} }
@ -1171,11 +1171,7 @@ class BlockReceiver implements Closeable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e); LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) { if (running) {
try { datanode.checkDiskError();
datanode.checkDiskError(e); // may throw an exception here
} catch (IOException ioe) {
LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
}
LOG.info(myString, e); LOG.info(myString, e);
running = false; running = false;
if (!Thread.interrupted()) { // failure not caused by interruption if (!Thread.interrupted()) { // failure not caused by interruption

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.*; import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -92,8 +91,6 @@ import javax.management.ObjectName;
import java.io.*; import java.io.*;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.net.*; import java.net.*;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.*; import java.util.*;
@ -229,6 +226,11 @@ public class DataNode extends Configured
ReadaheadPool readaheadPool; ReadaheadPool readaheadPool;
private final boolean getHdfsBlockLocationsEnabled; private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName; private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null;
protected final int checkDiskErrorInterval = 5*1000;
private boolean checkDiskErrorFlag = false;
private Object checkDiskErrorMutex = new Object();
private long lastDiskErrorCheck;
/** /**
* Create the DataNode given a configuration, an array of dataDirs, * Create the DataNode given a configuration, an array of dataDirs,
@ -238,6 +240,7 @@ public class DataNode extends Configured
final List<StorageLocation> dataDirs, final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException { final SecureResources resources) throws IOException {
super(conf); super(conf);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@ -1212,6 +1215,11 @@ public class DataNode extends Configured
this.dataXceiverServer.interrupt(); this.dataXceiverServer.interrupt();
} }
// Interrupt the checkDiskErrorThread and terminate it.
if(this.checkDiskErrorThread != null) {
this.checkDiskErrorThread.interrupt();
}
// Record the time of initial notification // Record the time of initial notification
long timeNotified = Time.now(); long timeNotified = Time.now();
@ -1321,55 +1329,17 @@ public class DataNode extends Configured
} }
/** Check if there is no space in disk
* @param e that caused this checkDiskError call
**/
protected void checkDiskError(Exception e ) throws IOException {
LOG.warn("checkDiskError: exception: ", e);
if (isNetworkRelatedException(e)) {
LOG.info("Not checking disk as checkDiskError was called on a network" +
" related exception");
return;
}
if (e.getMessage() != null &&
e.getMessage().startsWith("No space left on device")) {
throw new DiskOutOfSpaceException("No space left on device");
} else {
checkDiskError();
}
}
/**
* Check if the provided exception looks like it's from a network error
* @param e the exception from a checkDiskError call
* @return true if this exception is network related, false otherwise
*/
protected boolean isNetworkRelatedException(Exception e) {
if (e instanceof SocketException
|| e instanceof SocketTimeoutException
|| e instanceof ClosedChannelException
|| e instanceof ClosedByInterruptException) {
return true;
}
String msg = e.getMessage();
return null != msg
&& (msg.startsWith("An established connection was aborted")
|| msg.startsWith("Broken pipe")
|| msg.startsWith("Connection reset")
|| msg.contains("java.nio.channels.SocketChannel"));
}
/** /**
* Check if there is a disk failure and if so, handle the error * Check if there is a disk failure and if so, handle the error
*/ */
public void checkDiskError() { public void checkDiskError() {
try { synchronized(checkDiskErrorMutex) {
data.checkDataDir(); checkDiskErrorFlag = true;
} catch (DiskErrorException de) { if(checkDiskErrorThread == null) {
handleDiskError(de.getMessage()); startCheckDiskErrorThread();
checkDiskErrorThread.start();
LOG.info("Starting CheckDiskError Thread");
}
} }
} }
@ -1670,12 +1640,7 @@ public class DataNode extends Configured
LOG.warn(bpReg + ":Failed to transfer " + b + " to " + LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie); targets[0] + " got ", ie);
// check if there are any disk problem // check if there are any disk problem
try{ checkDiskError();
checkDiskError(ie);
} catch(IOException e) {
LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
}
} finally { } finally {
xmitsInProgress.getAndDecrement(); xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender); IOUtils.closeStream(blockSender);
@ -2590,4 +2555,50 @@ public class DataNode extends Configured
public ShortCircuitRegistry getShortCircuitRegistry() { public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry; return shortCircuitRegistry;
} }
/**
* Starts a new thread which will check for disk error check request
* every 5 sec
*/
private void startCheckDiskErrorThread() {
checkDiskErrorThread = new Thread(new Runnable() {
@Override
public void run() {
while(shouldRun) {
boolean tempFlag ;
synchronized(checkDiskErrorMutex) {
tempFlag = checkDiskErrorFlag;
checkDiskErrorFlag = false;
}
if(tempFlag) {
try {
data.checkDataDir();
} catch (DiskErrorException de) {
handleDiskError(de.getMessage());
} catch (Exception e) {
LOG.warn("Unexpected exception occurred while checking disk error " + e);
checkDiskErrorThread = null;
return;
}
synchronized(checkDiskErrorMutex) {
lastDiskErrorCheck = System.currentTimeMillis();
}
}
try {
Thread.sleep(checkDiskErrorInterval);
} catch (InterruptedException e) {
LOG.debug("InterruptedException in check disk error thread", e);
checkDiskErrorThread = null;
return;
}
}
}
});
}
public long getLastDiskErrorCheck() {
synchronized(checkDiskErrorMutex) {
return lastDiskErrorCheck;
}
}
} }

View File

@ -18,16 +18,13 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -201,15 +198,23 @@ public class TestDiskError {
} }
} }
/**
* Checks whether {@link DataNode#checkDiskError()} is being called or not.
* Before refactoring the code the above function was not getting called
* @throws IOException, InterruptedException
*/
@Test @Test
public void testNetworkErrorsIgnored() { public void testcheckDiskError() throws IOException, InterruptedException {
DataNode dn = cluster.getDataNodes().iterator().next(); if(cluster.getDataNodes().size() <= 0) {
cluster.startDataNodes(conf, 1, true, null, null);
assertTrue(dn.isNetworkRelatedException(new SocketException())); cluster.waitActive();
assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException())); }
assertTrue(dn.isNetworkRelatedException(new ClosedChannelException())); DataNode dataNode = cluster.getDataNodes().get(0);
assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar"))); long slackTime = dataNode.checkDiskErrorInterval/2;
assertFalse(dn.isNetworkRelatedException(new Exception())); //checking for disk error
assertFalse(dn.isNetworkRelatedException(new Exception("random problem"))); dataNode.checkDiskError();
Thread.sleep(dataNode.checkDiskErrorInterval);
long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((System.currentTimeMillis()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
} }
} }