From 5ee9c6f28a87c6888d3f59325b1380fc21dce148 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 12 May 2014 19:09:42 +0000 Subject: [PATCH] svn merge -c 1594055 merging from trunk to branch-2 to fix:HDFS-5522. Datanode disk error check may be incorrectly skipped. Contributed by Rushabh Shah. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1594056 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 12 +- .../hadoop/hdfs/server/datanode/DataNode.java | 123 ++++++++++-------- .../hdfs/server/datanode/TestDiskError.java | 31 +++-- 4 files changed, 92 insertions(+), 77 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ca407aab300..6adcc808301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -200,6 +200,9 @@ Release 2.5.0 - UNRELEASED HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory. (Yongjun Zhang via wang) + HDFS-5522. Datanode disk error check may be incorrectly skipped. + (Rushabh S Shah via kihwal) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 0e77d286bec..a7643b571e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -248,7 +248,7 @@ class BlockReceiver implements Closeable { if (cause != null) { // possible disk error ioe = cause; - datanode.checkDiskError(ioe); // may throw an exception here + datanode.checkDiskError(); } throw ioe; @@ -324,7 +324,7 @@ class BlockReceiver implements Closeable { } // disk check if(ioe != null) { - datanode.checkDiskError(ioe); + datanode.checkDiskError(); throw ioe; } } @@ -615,7 +615,7 @@ class BlockReceiver implements Closeable { manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { - datanode.checkDiskError(iex); + datanode.checkDiskError(); throw iex; } } @@ -1171,11 +1171,7 @@ class BlockReceiver implements Closeable { } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { - try { - datanode.checkDiskError(e); // may throw an exception here - } catch (IOException ioe) { - LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe); - } + datanode.checkDiskError(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index bf06b18766f..7e22e4d1a05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -87,7 +87,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.*; import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.mortbay.util.ajax.JSON; import javax.management.ObjectName; @@ -95,8 +94,6 @@ import javax.management.ObjectName; import java.io.*; import java.lang.management.ManagementFactory; import java.net.*; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; import java.security.PrivilegedExceptionAction; import java.util.*; @@ -232,6 +229,11 @@ public class DataNode extends Configured ReadaheadPool readaheadPool; private final boolean getHdfsBlockLocationsEnabled; private ObjectName dataNodeInfoBeanName; + private Thread checkDiskErrorThread = null; + protected final int checkDiskErrorInterval = 5*1000; + private boolean checkDiskErrorFlag = false; + private Object checkDiskErrorMutex = new Object(); + private long lastDiskErrorCheck; /** * Create the DataNode given a configuration, an array of dataDirs, @@ -241,6 +243,7 @@ public class DataNode extends Configured final List dataDirs, final SecureResources resources) throws IOException { super(conf); + this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -1219,6 +1222,11 @@ public class DataNode extends Configured this.dataXceiverServer.interrupt(); } + // Interrupt the checkDiskErrorThread and terminate it. + if(this.checkDiskErrorThread != null) { + this.checkDiskErrorThread.interrupt(); + } + // Record the time of initial notification long timeNotified = Time.now(); @@ -1328,55 +1336,17 @@ public class DataNode extends Configured } - /** Check if there is no space in disk - * @param e that caused this checkDiskError call - **/ - protected void checkDiskError(Exception e ) throws IOException { - - LOG.warn("checkDiskError: exception: ", e); - if (isNetworkRelatedException(e)) { - LOG.info("Not checking disk as checkDiskError was called on a network" + - " related exception"); - return; - } - if (e.getMessage() != null && - e.getMessage().startsWith("No space left on device")) { - throw new DiskOutOfSpaceException("No space left on device"); - } else { - checkDiskError(); - } - } - - /** - * Check if the provided exception looks like it's from a network error - * @param e the exception from a checkDiskError call - * @return true if this exception is network related, false otherwise - */ - protected boolean isNetworkRelatedException(Exception e) { - if (e instanceof SocketException - || e instanceof SocketTimeoutException - || e instanceof ClosedChannelException - || e instanceof ClosedByInterruptException) { - return true; - } - - String msg = e.getMessage(); - - return null != msg - && (msg.startsWith("An established connection was aborted") - || msg.startsWith("Broken pipe") - || msg.startsWith("Connection reset") - || msg.contains("java.nio.channels.SocketChannel")); - } - /** * Check if there is a disk failure and if so, handle the error */ public void checkDiskError() { - try { - data.checkDataDir(); - } catch (DiskErrorException de) { - handleDiskError(de.getMessage()); + synchronized(checkDiskErrorMutex) { + checkDiskErrorFlag = true; + if(checkDiskErrorThread == null) { + startCheckDiskErrorThread(); + checkDiskErrorThread.start(); + LOG.info("Starting CheckDiskError Thread"); + } } } @@ -1676,13 +1646,8 @@ public class DataNode extends Configured } catch (IOException ie) { LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0] + " got ", ie); - // check if there are any disk problem - try{ - checkDiskError(ie); - } catch(IOException e) { - LOG.warn("DataNode.checkDiskError failed in run() with: ", e); - } - + // check if there are any disk problem + checkDiskError(); } finally { xmitsInProgress.getAndDecrement(); IOUtils.closeStream(blockSender); @@ -2597,4 +2562,50 @@ public class DataNode extends Configured public ShortCircuitRegistry getShortCircuitRegistry() { return shortCircuitRegistry; } -} + + /** + * Starts a new thread which will check for disk error check request + * every 5 sec + */ + private void startCheckDiskErrorThread() { + checkDiskErrorThread = new Thread(new Runnable() { + @Override + public void run() { + while(shouldRun) { + boolean tempFlag ; + synchronized(checkDiskErrorMutex) { + tempFlag = checkDiskErrorFlag; + checkDiskErrorFlag = false; + } + if(tempFlag) { + try { + data.checkDataDir(); + } catch (DiskErrorException de) { + handleDiskError(de.getMessage()); + } catch (Exception e) { + LOG.warn("Unexpected exception occurred while checking disk error " + e); + checkDiskErrorThread = null; + return; + } + synchronized(checkDiskErrorMutex) { + lastDiskErrorCheck = System.currentTimeMillis(); + } + } + try { + Thread.sleep(checkDiskErrorInterval); + } catch (InterruptedException e) { + LOG.debug("InterruptedException in check disk error thread", e); + checkDiskErrorThread = null; + return; + } + } + } + }); + } + + public long getLastDiskErrorCheck() { + synchronized(checkDiskErrorMutex) { + return lastDiskErrorCheck; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index e36005bafc6..681ed324e67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -18,16 +18,13 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataOutputStream; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.channels.ClosedChannelException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -201,15 +198,23 @@ public class TestDiskError { } } + /** + * Checks whether {@link DataNode#checkDiskError()} is being called or not. + * Before refactoring the code the above function was not getting called + * @throws IOException, InterruptedException + */ @Test - public void testNetworkErrorsIgnored() { - DataNode dn = cluster.getDataNodes().iterator().next(); - - assertTrue(dn.isNetworkRelatedException(new SocketException())); - assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException())); - assertTrue(dn.isNetworkRelatedException(new ClosedChannelException())); - assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar"))); - assertFalse(dn.isNetworkRelatedException(new Exception())); - assertFalse(dn.isNetworkRelatedException(new Exception("random problem"))); + public void testcheckDiskError() throws IOException, InterruptedException { + if(cluster.getDataNodes().size() <= 0) { + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + } + DataNode dataNode = cluster.getDataNodes().get(0); + long slackTime = dataNode.checkDiskErrorInterval/2; + //checking for disk error + dataNode.checkDiskError(); + Thread.sleep(dataNode.checkDiskErrorInterval); + long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck(); + assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((System.currentTimeMillis()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime))); } }