HDFS-11754. Make FsServerDefaults cache configurable. Contributed by Mikhail Erofeev.
This commit is contained in:
parent
8498d287cd
commit
53509f295b
|
@ -24,6 +24,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACH
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
|
||||||
|
|
||||||
|
@ -205,8 +207,6 @@ import com.google.common.net.InetAddresses;
|
||||||
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
DataEncryptionKeyFactory {
|
DataEncryptionKeyFactory {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
|
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
|
||||||
// 1 hour
|
|
||||||
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
|
|
||||||
private static final String DFS_KMS_PREFIX = "dfs-kms-";
|
private static final String DFS_KMS_PREFIX = "dfs-kms-";
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
@ -240,6 +240,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
||||||
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
|
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
|
||||||
private final int smallBufferSize;
|
private final int smallBufferSize;
|
||||||
|
private final long serverDefaultsValidityPeriod;
|
||||||
|
|
||||||
public DfsClientConf getConf() {
|
public DfsClientConf getConf() {
|
||||||
return dfsClientConf;
|
return dfsClientConf;
|
||||||
|
@ -371,6 +372,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
|
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
|
||||||
Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
|
Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
|
||||||
null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
|
null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
|
||||||
|
this.serverDefaultsValidityPeriod =
|
||||||
|
conf.getLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
|
||||||
|
DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT);
|
||||||
Boolean writeDropBehind =
|
Boolean writeDropBehind =
|
||||||
(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
|
(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
|
||||||
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
|
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
|
||||||
|
@ -663,7 +667,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
checkOpen();
|
checkOpen();
|
||||||
long now = Time.monotonicNow();
|
long now = Time.monotonicNow();
|
||||||
if ((serverDefaults == null) ||
|
if ((serverDefaults == null) ||
|
||||||
(now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
|
(now - serverDefaultsLastUpdate > serverDefaultsValidityPeriod)) {
|
||||||
serverDefaults = namenode.getServerDefaults();
|
serverDefaults = namenode.getServerDefaults();
|
||||||
serverDefaultsLastUpdate = now;
|
serverDefaultsLastUpdate = now;
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,10 @@ public interface HdfsClientConfigKeys {
|
||||||
String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
|
String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
|
||||||
"dfs.client.max.block.acquire.failures";
|
"dfs.client.max.block.acquire.failures";
|
||||||
int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
|
int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
|
||||||
|
String DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY =
|
||||||
|
"dfs.client.server-defaults.validity.period.ms";
|
||||||
|
long DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT =
|
||||||
|
TimeUnit.HOURS.toMillis(1);
|
||||||
String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
||||||
String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
||||||
String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
|
String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
|
||||||
|
|
|
@ -2282,6 +2282,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.server-defaults.validity.period.ms</name>
|
||||||
|
<value>3600000</value>
|
||||||
|
<description>
|
||||||
|
The amount of milliseconds after which cached server defaults are updated.
|
||||||
|
|
||||||
|
By default this parameter is set to 1 hour.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.enable.retrycache</name>
|
<name>dfs.namenode.enable.retrycache</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
||||||
|
@ -38,6 +39,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -51,6 +53,7 @@ import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
@ -179,6 +182,106 @@ public class TestFileCreation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that server default values are cached on the client size
|
||||||
|
* and are stale after namenode update.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testServerDefaultsWithCaching()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// Create cluster with an explicit block size param
|
||||||
|
Configuration clusterConf = new HdfsConfiguration();
|
||||||
|
long originalBlockSize = DFS_BLOCK_SIZE_DEFAULT * 2;
|
||||||
|
clusterConf.setLong(DFS_BLOCK_SIZE_KEY, originalBlockSize);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(clusterConf)
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
// Set a spy namesystem inside the namenode and return it
|
||||||
|
FSNamesystem spyNamesystem =
|
||||||
|
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
||||||
|
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
||||||
|
try {
|
||||||
|
// Create a dfs client and set a long enough validity interval
|
||||||
|
Configuration clientConf = new HdfsConfiguration();
|
||||||
|
clientConf.setLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
|
||||||
|
TimeUnit.MINUTES.toMillis(1));
|
||||||
|
DFSClient dfsClient = new DFSClient(nameNodeAddr, clientConf);
|
||||||
|
FsServerDefaults defaults = dfsClient.getServerDefaults();
|
||||||
|
assertEquals(originalBlockSize, defaults.getBlockSize());
|
||||||
|
|
||||||
|
// Update the namenode with a new parameter
|
||||||
|
long updatedDefaultBlockSize = DFS_BLOCK_SIZE_DEFAULT * 3;
|
||||||
|
FsServerDefaults newDefaults =
|
||||||
|
new FsServerDefaults(updatedDefaultBlockSize,
|
||||||
|
defaults.getBytesPerChecksum(), defaults.getWritePacketSize(),
|
||||||
|
defaults.getReplication(), defaults.getFileBufferSize(),
|
||||||
|
defaults.getEncryptDataTransfer(), defaults.getTrashInterval(),
|
||||||
|
defaults.getChecksumType(), defaults.getKeyProviderUri(),
|
||||||
|
defaults.getDefaultStoragePolicyId());
|
||||||
|
doReturn(newDefaults).when(spyNamesystem).getServerDefaults();
|
||||||
|
|
||||||
|
// The value is stale
|
||||||
|
Thread.sleep(1);
|
||||||
|
defaults = dfsClient.getServerDefaults();
|
||||||
|
assertEquals(originalBlockSize, defaults.getBlockSize());
|
||||||
|
|
||||||
|
// Another client reads the updated value correctly
|
||||||
|
DFSClient newDfsClient = new DFSClient(nameNodeAddr, clientConf);
|
||||||
|
defaults = newDfsClient.getServerDefaults();
|
||||||
|
assertEquals(updatedDefaultBlockSize, defaults.getBlockSize());
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that server defaults are updated on the client after cache expiration.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testServerDefaultsWithMinimalCaching()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// Create cluster with an explicit block size param
|
||||||
|
Configuration clusterConf = new HdfsConfiguration();
|
||||||
|
long originalBlockSize = DFS_BLOCK_SIZE_DEFAULT * 2;
|
||||||
|
clusterConf.setLong(DFS_BLOCK_SIZE_KEY, originalBlockSize);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(clusterConf)
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
// Set a spy namesystem inside the namenode and return it
|
||||||
|
FSNamesystem spyNamesystem =
|
||||||
|
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
||||||
|
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
||||||
|
try {
|
||||||
|
// Create a dfs client and set a minimal validity interval
|
||||||
|
Configuration clientConf = new HdfsConfiguration();
|
||||||
|
// Invalidate cache in at most 1 ms, see DfsClient#getServerDefaults
|
||||||
|
clientConf.setLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY, 0L);
|
||||||
|
DFSClient dfsClient = new DFSClient(nameNodeAddr, clientConf);
|
||||||
|
FsServerDefaults defaults = dfsClient.getServerDefaults();
|
||||||
|
assertEquals(originalBlockSize, defaults.getBlockSize());
|
||||||
|
|
||||||
|
// Update the namenode with a new FsServerDefaults
|
||||||
|
long updatedDefaultBlockSize = DFS_BLOCK_SIZE_DEFAULT * 3;
|
||||||
|
FsServerDefaults newDefaults =
|
||||||
|
new FsServerDefaults(updatedDefaultBlockSize,
|
||||||
|
defaults.getBytesPerChecksum(), defaults.getWritePacketSize(),
|
||||||
|
defaults.getReplication(), defaults.getFileBufferSize(),
|
||||||
|
defaults.getEncryptDataTransfer(), defaults.getTrashInterval(),
|
||||||
|
defaults.getChecksumType(), defaults.getKeyProviderUri(),
|
||||||
|
defaults.getDefaultStoragePolicyId());
|
||||||
|
doReturn(newDefaults).when(spyNamesystem).getServerDefaults();
|
||||||
|
|
||||||
|
Thread.sleep(1);
|
||||||
|
defaults = dfsClient.getServerDefaults();
|
||||||
|
// Value is updated correctly
|
||||||
|
assertEquals(updatedDefaultBlockSize, defaults.getBlockSize());
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileCreation() throws IOException {
|
public void testFileCreation() throws IOException {
|
||||||
checkFileCreation(null, false);
|
checkFileCreation(null, false);
|
||||||
|
|
Loading…
Reference in New Issue