HDFS-4246. The exclude node list should be more forgiving, for each output stream. Contributed by Harsh J. Chouraria.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1459475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-03-21 18:58:06 +00:00
parent 3574720017
commit a10fbb93e7
6 changed files with 140 additions and 10 deletions

View File

@ -366,6 +366,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick
McCabe via atm) McCabe via atm)
HDFS-4246. The exclude node list should be more forgiving, for each output
stream. (harsh via atm)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -41,6 +41,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPAC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@ -212,6 +214,7 @@ public class DFSClient implements java.io.Closeable {
final int socketTimeout; final int socketTimeout;
final int socketCacheCapacity; final int socketCacheCapacity;
final long socketCacheExpiry; final long socketCacheExpiry;
final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */ /** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow; final int timeWindow;
final int nCachedConnRetry; final int nCachedConnRetry;
@ -262,6 +265,9 @@ public class DFSClient implements java.io.Closeable {
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
excludedNodesCacheExpiry = conf.getLong(
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize); 10 * defaultBlockSize);
timeWindow = conf timeWindow = conf

View File

@ -76,6 +76,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";

View File

@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -82,6 +83,11 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
/**************************************************************** /****************************************************************
@ -289,7 +295,25 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private DataInputStream blockReplyStream; private DataInputStream blockReplyStream;
private ResponseProcessor response = null; private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>(); private LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
.expireAfterWrite(
dfsClient.getConf().excludedNodesCacheExpiry,
TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
@Override
public void onRemoval(
RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
DFSClient.LOG.info("Removing node " +
notification.getKey() + " from the excluded nodes list");
}
})
.build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
@Override
public DatanodeInfo load(DatanodeInfo key) throws Exception {
return key;
}
});
volatile boolean hasError = false; volatile boolean hasError = false;
volatile int errorIndex = -1; volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage private BlockConstructionStage stage; // block construction stage
@ -999,8 +1023,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
success = false; success = false;
long startTime = Time.now(); long startTime = Time.now();
DatanodeInfo[] excluded = excludedNodes.toArray( DatanodeInfo[] excluded =
new DatanodeInfo[excludedNodes.size()]); excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock; block = oldBlock;
lb = locateFollowingBlock(startTime, lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null); excluded.length > 0 ? excluded : null);
@ -1019,7 +1045,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName); dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
block = null; block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.add(nodes[errorIndex]); excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
} }
} while (!success && --count >= 0); } while (!success && --count >= 0);

View File

@ -594,6 +594,17 @@
<description>Packet size for clients to write</description> <description>Packet size for clients to write</description>
</property> </property>
<property>
<name>dfs.client.write.exclude.nodes.cache.expiry.interval.millis</name>
<value>600000</value>
<description>The maximum period to keep a DN in the excluded nodes list
at a client. After this period, in milliseconds, the previously excluded node(s) will
be removed automatically from the cache and will be considered good for block allocations
again. Useful to lower or raise in situations where you keep a file open for very long
periods (such as a Write-Ahead-Log (WAL) file) to make the writer tolerant to cluster maintenance
restarts. Defaults to 10 minutes.</description>
</property>
<property> <property>
<name>dfs.namenode.checkpoint.dir</name> <name>dfs.namenode.checkpoint.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/namesecondary</value> <value>file://${hadoop.tmp.dir}/dfs/namesecondary</value>

View File

@ -21,20 +21,27 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test; import org.junit.Test;
/** /**
* These tests make sure that DFSClient retries fetching data from DFS * These tests make sure that DFSClient excludes writing data to
* properly in case of errors. * a DN properly in case of errors.
*/ */
public class TestDFSClientExcludedNodes { public class TestDFSClientExcludedNodes {
@Test @Test(timeout=10000)
public void testExcludedNodes() throws IOException { public void testExcludedNodes() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@ -43,14 +50,89 @@ public class TestDFSClientExcludedNodes {
// kill a datanode // kill a datanode
cluster.stopDataNode(AppendTestUtil.nextInt(3)); cluster.stopDataNode(AppendTestUtil.nextInt(3));
OutputStream out = fs.create(filePath, true, 4096); OutputStream out = fs.create(
filePath,
true,
4096,
(short) 3,
fs.getDefaultBlockSize(filePath)
);
out.write(20); out.write(20);
try { try {
out.close(); out.close();
} catch (Exception e) { } catch (Exception e) {
fail("DataNode failure should not result in a block abort: \n" + e.getMessage()); fail("Single DN failure should not result in a block abort: \n" +
e.getMessage());
} }
} }
@Test(timeout=10000)
public void testExcludedNodesForgiveness() throws IOException {
Configuration conf = new HdfsConfiguration();
// Forgive nodes in under 1s for this test case.
conf.setLong(
DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
1000);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
conf.setInt("io.bytes.per.checksum", 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
List<DataNodeProperties> props = cluster.dataNodes;
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testForgivingExcludedNodes");
// 256 bytes data chunk for writes
byte[] bytes = new byte[256];
for (int index=0; index<bytes.length; index++) {
bytes[index] = '0';
}
// File with a 512 bytes block size
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
// Write a block to all 3 DNs (2x256bytes).
out.write(bytes);
out.write(bytes);
out.hflush();
// Remove two DNs, to put them into the exclude list.
DataNodeProperties two = cluster.stopDataNode(2);
DataNodeProperties one = cluster.stopDataNode(1);
// Write another block.
// At this point, we have two nodes already in excluded list.
out.write(bytes);
out.write(bytes);
out.hflush();
// Bring back the older DNs, since they are gonna be forgiven only
// afterwards of this previous block write.
Assert.assertEquals(true, cluster.restartDataNode(one, true));
Assert.assertEquals(true, cluster.restartDataNode(two, true));
cluster.waitActive();
// Sleep for 2s, to let the excluded nodes be expired
// from the excludes list (i.e. forgiven after the configured wait period).
// [Sleeping just in case the restart of the DNs completed < 2s cause
// otherwise, we'll end up quickly excluding those again.]
ThreadUtil.sleepAtLeastIgnoreInterrupts(2000);
// Terminate the last good DN, to assert that there's no
// single-DN-available scenario, caused by not forgiving the other
// two by now.
cluster.stopDataNode(0);
try {
// Attempt writing another block, which should still pass
// cause the previous two should have been forgiven by now,
// while the last good DN added to excludes this time.
out.write(bytes);
out.hflush();
out.close();
} catch (Exception e) {
fail("Excluded DataNodes should be forgiven after a while and " +
"not cause file writing exception of: '" + e.getMessage() + "'");
}
}
} }