diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 62f73fcdf7c..e527c3d1120 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -366,6 +366,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick McCabe via atm) + HDFS-4246. The exclude node list should be more forgiving, for each output + stream. (harsh via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 15db7332287..92287789959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -41,6 +41,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPAC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; @@ -212,6 +214,7 @@ public class DFSClient implements java.io.Closeable { final int socketTimeout; final int socketCacheCapacity; final long socketCacheExpiry; + final long excludedNodesCacheExpiry; /** Wait time window (in msec) if BlockMissingException is caught */ final int timeWindow; final int nCachedConnRetry; @@ -262,6 +265,9 @@ public class DFSClient implements java.io.Closeable { DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); + excludedNodesCacheExpiry = conf.getLong( + DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, + DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * defaultBlockSize); timeWindow = conf diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 034c4b50b8e..4d3b6733eeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -76,6 +76,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; + public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis"; + public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1edd2fe746f..a255881a497 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; @@ -82,6 +83,11 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; /**************************************************************** @@ -289,7 +295,25 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block - private ArrayList excludedNodes = new ArrayList(); + private LoadingCache excludedNodes = + CacheBuilder.newBuilder() + .expireAfterWrite( + dfsClient.getConf().excludedNodesCacheExpiry, + TimeUnit.MILLISECONDS) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + DFSClient.LOG.info("Removing node " + + notification.getKey() + " from the excluded nodes list"); + } + }) + .build(new CacheLoader() { + @Override + public DatanodeInfo load(DatanodeInfo key) throws Exception { + return key; + } + }); volatile boolean hasError = false; volatile int errorIndex = -1; private BlockConstructionStage stage; // block construction stage @@ -999,8 +1023,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { success = false; long startTime = Time.now(); - DatanodeInfo[] excluded = excludedNodes.toArray( - new DatanodeInfo[excludedNodes.size()]); + DatanodeInfo[] excluded = + excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) + .keySet() + .toArray(new DatanodeInfo[0]); block = oldBlock; lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null); @@ -1019,7 +1045,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName); block = null; DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); - excludedNodes.add(nodes[errorIndex]); + excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); } } while (!success && --count >= 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ef8b237c1e0..45424c52e28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -594,6 +594,17 @@ Packet size for clients to write + + dfs.client.write.exclude.nodes.cache.expiry.interval.millis + 600000 + The maximum period to keep a DN in the excluded nodes list + at a client. After this period, in milliseconds, the previously excluded node(s) will + be removed automatically from the cache and will be considered good for block allocations + again. Useful to lower or raise in situations where you keep a file open for very long + periods (such as a Write-Ahead-Log (WAL) file) to make the writer tolerant to cluster maintenance + restarts. Defaults to 10 minutes. + + dfs.namenode.checkpoint.dir file://${hadoop.tmp.dir}/dfs/namesecondary diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java index dccc82f1a42..fa88e394577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java @@ -21,20 +21,27 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.io.OutputStream; +import java.util.List; + +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.util.ThreadUtil; + import org.junit.Test; /** - * These tests make sure that DFSClient retries fetching data from DFS - * properly in case of errors. + * These tests make sure that DFSClient excludes writing data to + * a DN properly in case of errors. */ public class TestDFSClientExcludedNodes { - @Test + @Test(timeout=10000) public void testExcludedNodes() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); @@ -43,14 +50,89 @@ public class TestDFSClientExcludedNodes { // kill a datanode cluster.stopDataNode(AppendTestUtil.nextInt(3)); - OutputStream out = fs.create(filePath, true, 4096); + OutputStream out = fs.create( + filePath, + true, + 4096, + (short) 3, + fs.getDefaultBlockSize(filePath) + ); out.write(20); try { out.close(); } catch (Exception e) { - fail("DataNode failure should not result in a block abort: \n" + e.getMessage()); + fail("Single DN failure should not result in a block abort: \n" + + e.getMessage()); + } + } + + @Test(timeout=10000) + public void testExcludedNodesForgiveness() throws IOException { + Configuration conf = new HdfsConfiguration(); + // Forgive nodes in under 1s for this test case. + conf.setLong( + DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, + 1000); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + List props = cluster.dataNodes; + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testForgivingExcludedNodes"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index=0; index