HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf. Contributed by Mingliang Liu.
This commit is contained in:
parent
0bf285413f
commit
7e971b7315
|
@ -113,7 +113,7 @@ public interface HdfsClientConfigKeys {
|
|||
"dfs.datanode.hdfs-blocks-metadata.enabled";
|
||||
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
|
||||
|
||||
static final String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
|
||||
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
|
||||
PREFIX + "replica.accessor.builder.classes";
|
||||
|
||||
/** dfs.client.retry configuration properties */
|
||||
|
|
|
@ -45,10 +45,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATA
|
|||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||
|
@ -71,8 +67,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRIT
|
|||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
||||
|
@ -256,7 +250,7 @@ public class DfsClientConf {
|
|||
return Collections.emptyList();
|
||||
}
|
||||
ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
|
||||
new ArrayList<Class<? extends ReplicaAccessorBuilder>>();
|
||||
new ArrayList<>();
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
for (String className: classNames) {
|
||||
try {
|
||||
|
@ -751,35 +745,33 @@ public class DfsClientConf {
|
|||
}
|
||||
|
||||
public String confAsString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("shortCircuitStreamsCacheSize = ").
|
||||
append(shortCircuitStreamsCacheSize).
|
||||
append(", shortCircuitStreamsCacheExpiryMs = ").
|
||||
append(shortCircuitStreamsCacheExpiryMs).
|
||||
append(", shortCircuitMmapCacheSize = ").
|
||||
append(shortCircuitMmapCacheSize).
|
||||
append(", shortCircuitMmapCacheExpiryMs = ").
|
||||
append(shortCircuitMmapCacheExpiryMs).
|
||||
append(", shortCircuitMmapCacheRetryTimeout = ").
|
||||
append(shortCircuitMmapCacheRetryTimeout).
|
||||
append(", shortCircuitCacheStaleThresholdMs = ").
|
||||
append(shortCircuitCacheStaleThresholdMs).
|
||||
append(", socketCacheCapacity = ").
|
||||
append(socketCacheCapacity).
|
||||
append(", socketCacheExpiry = ").
|
||||
append(socketCacheExpiry).
|
||||
append(", shortCircuitLocalReads = ").
|
||||
append(shortCircuitLocalReads).
|
||||
append(", useLegacyBlockReaderLocal = ").
|
||||
append(useLegacyBlockReaderLocal).
|
||||
append(", domainSocketDataTraffic = ").
|
||||
append(domainSocketDataTraffic).
|
||||
append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
|
||||
append(shortCircuitSharedMemoryWatcherInterruptCheckMs).
|
||||
append(", keyProviderCacheExpiryMs = ").
|
||||
append(keyProviderCacheExpiryMs);
|
||||
|
||||
return builder.toString();
|
||||
return "shortCircuitStreamsCacheSize = "
|
||||
+ shortCircuitStreamsCacheSize
|
||||
+ ", shortCircuitStreamsCacheExpiryMs = "
|
||||
+ shortCircuitStreamsCacheExpiryMs
|
||||
+ ", shortCircuitMmapCacheSize = "
|
||||
+ shortCircuitMmapCacheSize
|
||||
+ ", shortCircuitMmapCacheExpiryMs = "
|
||||
+ shortCircuitMmapCacheExpiryMs
|
||||
+ ", shortCircuitMmapCacheRetryTimeout = "
|
||||
+ shortCircuitMmapCacheRetryTimeout
|
||||
+ ", shortCircuitCacheStaleThresholdMs = "
|
||||
+ shortCircuitCacheStaleThresholdMs
|
||||
+ ", socketCacheCapacity = "
|
||||
+ socketCacheCapacity
|
||||
+ ", socketCacheExpiry = "
|
||||
+ socketCacheExpiry
|
||||
+ ", shortCircuitLocalReads = "
|
||||
+ shortCircuitLocalReads
|
||||
+ ", useLegacyBlockReaderLocal = "
|
||||
+ useLegacyBlockReaderLocal
|
||||
+ ", domainSocketDataTraffic = "
|
||||
+ domainSocketDataTraffic
|
||||
+ ", shortCircuitSharedMemoryWatcherInterruptCheckMs = "
|
||||
+ shortCircuitSharedMemoryWatcherInterruptCheckMs
|
||||
+ ", keyProviderCacheExpiryMs = "
|
||||
+ keyProviderCacheExpiryMs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -850,6 +850,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8248. Store INodeId instead of the INodeFile object in
|
||||
BlockInfoContiguous. (wheat9)
|
||||
|
||||
HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
|
||||
(Mingliang Liu via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
|||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||
import org.apache.hadoop.io.ByteWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
|
@ -328,10 +327,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
* If there was another problem.
|
||||
*/
|
||||
public BlockReader build() throws IOException {
|
||||
BlockReader reader = null;
|
||||
|
||||
Preconditions.checkNotNull(configuration);
|
||||
reader = tryToCreateExternalBlockReader();
|
||||
BlockReader reader = tryToCreateExternalBlockReader();
|
||||
if (reader != null) {
|
||||
return reader;
|
||||
}
|
||||
|
@ -432,7 +429,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
"disableLegacyBlockReaderLocal is set.", this);
|
||||
return null;
|
||||
}
|
||||
IOException ioe = null;
|
||||
IOException ioe;
|
||||
try {
|
||||
return BlockReaderLocalLegacy.newBlockReader(conf,
|
||||
userGroupInformation, configuration, fileName, block, token,
|
||||
|
|
|
@ -703,10 +703,10 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||
mirrorSock = datanode.newSocket();
|
||||
try {
|
||||
int timeoutValue = dnConf.socketTimeout
|
||||
+ (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
||||
int writeTimeout = dnConf.socketWriteTimeout +
|
||||
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
|
||||
int timeoutValue = dnConf.socketTimeout +
|
||||
(HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
||||
int writeTimeout = dnConf.socketWriteTimeout +
|
||||
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
|
||||
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
|
||||
mirrorSock.setSoTimeout(timeoutValue);
|
||||
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestDataTransferProtocol {
|
|||
sock = new Socket();
|
||||
sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
|
||||
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
|
||||
|
||||
OutputStream out = sock.getOutputStream();
|
||||
// Should we excuse
|
||||
byte[] retBuf = new byte[recvBuf.size()];
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.net.SocketTimeoutException;
|
|||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -82,7 +83,7 @@ import org.mockito.InOrder;
|
|||
public class TestDistributedFileSystem {
|
||||
private static final Random RAN = new Random();
|
||||
|
||||
{
|
||||
static {
|
||||
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
|
@ -93,16 +94,17 @@ public class TestDistributedFileSystem {
|
|||
private HdfsConfiguration getTestConfiguration() {
|
||||
HdfsConfiguration conf;
|
||||
if (noXmlDefaults) {
|
||||
conf = new HdfsConfiguration(false);
|
||||
String namenodeDir = new File(MiniDFSCluster.getBaseDirectory(), "name").getAbsolutePath();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
|
||||
conf = new HdfsConfiguration(false);
|
||||
String namenodeDir = new File(MiniDFSCluster.getBaseDirectory(), "name").
|
||||
getAbsolutePath();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
|
||||
} else {
|
||||
conf = new HdfsConfiguration();
|
||||
conf = new HdfsConfiguration();
|
||||
}
|
||||
if (dualPortTesting) {
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||
"localhost:0");
|
||||
"localhost:0");
|
||||
}
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
|
||||
|
||||
|
@ -118,14 +120,17 @@ public class TestDistributedFileSystem {
|
|||
FileSystem fileSys = cluster.getFileSystem();
|
||||
fileSys.getDelegationToken("");
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileSystemCloseAll() throws Exception {
|
||||
Configuration conf = getTestConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).
|
||||
build();
|
||||
URI address = FileSystem.getDefaultUri(conf);
|
||||
|
||||
try {
|
||||
|
@ -640,7 +645,8 @@ public class TestDistributedFileSystem {
|
|||
}
|
||||
|
||||
/** Checks statistics. -1 indicates do not check for the operations */
|
||||
private void checkStatistics(FileSystem fs, int readOps, int writeOps, int largeReadOps) {
|
||||
private void checkStatistics(FileSystem fs, int readOps, int writeOps,
|
||||
int largeReadOps) {
|
||||
assertEquals(readOps, DFSTestUtil.getStatistics(fs).getReadOps());
|
||||
assertEquals(writeOps, DFSTestUtil.getStatistics(fs).getWriteOps());
|
||||
assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
|
||||
|
@ -654,7 +660,8 @@ public class TestDistributedFileSystem {
|
|||
|
||||
final Configuration conf = getTestConfiguration();
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(2).build();
|
||||
final FileSystem hdfs = cluster.getFileSystem();
|
||||
|
||||
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
|
@ -694,7 +701,8 @@ public class TestDistributedFileSystem {
|
|||
|
||||
final Path dir = new Path("/filechecksum");
|
||||
final int block_size = 1024;
|
||||
final int buffer_size = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
|
||||
final int buffer_size = conf.getInt(
|
||||
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
|
||||
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
||||
|
||||
//try different number of blocks
|
||||
|
@ -722,14 +730,15 @@ public class TestDistributedFileSystem {
|
|||
System.out.println("webhdfsfoocs=" + webhdfsfoocs);
|
||||
|
||||
final Path webhdfsqualified = new Path(webhdfsuri + dir, "foo" + n);
|
||||
final FileChecksum webhdfs_qfoocs = webhdfs.getFileChecksum(webhdfsqualified);
|
||||
final FileChecksum webhdfs_qfoocs =
|
||||
webhdfs.getFileChecksum(webhdfsqualified);
|
||||
System.out.println("webhdfs_qfoocs=" + webhdfs_qfoocs);
|
||||
|
||||
//create a zero byte file
|
||||
final Path zeroByteFile = new Path(dir, "zeroByteFile" + n);
|
||||
{
|
||||
final FSDataOutputStream out = hdfs.create(zeroByteFile, false, buffer_size,
|
||||
(short)2, block_size);
|
||||
final FSDataOutputStream out = hdfs.create(zeroByteFile, false,
|
||||
buffer_size, (short)2, block_size);
|
||||
out.close();
|
||||
}
|
||||
|
||||
|
@ -843,9 +852,7 @@ public class TestDistributedFileSystem {
|
|||
String[] ids = loc.getStorageIds();
|
||||
// Run it through a set to deduplicate, since there should be no dupes
|
||||
Set<String> storageIds = new HashSet<>();
|
||||
for (String id: ids) {
|
||||
storageIds.add(id);
|
||||
}
|
||||
Collections.addAll(storageIds, ids);
|
||||
assertEquals("Unexpected num storage ids", repl, storageIds.size());
|
||||
// Make sure these are all valid storage IDs
|
||||
assertTrue("Unknown storage IDs found!", dnStorageIds.containsAll
|
||||
|
@ -936,9 +943,7 @@ public class TestDistributedFileSystem {
|
|||
output.close();
|
||||
assertTrue("File status should be closed", fs.isFileClosed(file));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -953,8 +958,9 @@ public class TestDistributedFileSystem {
|
|||
final Path relative = new Path("relative");
|
||||
fs.create(new Path(relative, "foo")).close();
|
||||
|
||||
final List<LocatedFileStatus> retVal = new ArrayList<LocatedFileStatus>();
|
||||
final RemoteIterator<LocatedFileStatus> iter = fs.listFiles(relative, true);
|
||||
final List<LocatedFileStatus> retVal = new ArrayList<>();
|
||||
final RemoteIterator<LocatedFileStatus> iter =
|
||||
fs.listFiles(relative, true);
|
||||
while (iter.hasNext()) {
|
||||
retVal.add(iter.next());
|
||||
}
|
||||
|
@ -969,7 +975,7 @@ public class TestDistributedFileSystem {
|
|||
final int timeout = 1000;
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
|
||||
|
||||
|
||||
// only need cluster to create a dfs client to get a peer
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TestCachingStrategy {
|
|||
private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
|
||||
private static final int MAX_TEST_FILE_LEN = 1024 * 1024;
|
||||
private static final int WRITE_PACKET_SIZE = HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
|
||||
|
||||
private final static TestRecordingCacheTracker tracker =
|
||||
new TestRecordingCacheTracker();
|
||||
|
||||
|
@ -113,7 +113,7 @@ public class TestCachingStrategy {
|
|||
}
|
||||
|
||||
private static class TestRecordingCacheTracker extends CacheManipulator {
|
||||
private final Map<String, Stats> map = new TreeMap<String, Stats>();
|
||||
private final Map<String, Stats> map = new TreeMap<>();
|
||||
|
||||
@Override
|
||||
public void posixFadviseIfPossible(String name,
|
||||
|
@ -365,8 +365,6 @@ public class TestCachingStrategy {
|
|||
|
||||
// read file
|
||||
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
|
||||
// verify that we dropped everything from the cache.
|
||||
Assert.assertNull(stats);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -389,13 +387,10 @@ public class TestCachingStrategy {
|
|||
FileSystem fs = cluster.getFileSystem();
|
||||
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
|
||||
// verify that we can seek after setDropBehind
|
||||
FSDataInputStream fis = fs.open(new Path(TEST_PATH));
|
||||
try {
|
||||
try (FSDataInputStream fis = fs.open(new Path(TEST_PATH))) {
|
||||
Assert.assertTrue(fis.read() != -1); // create BlockReader
|
||||
fis.setDropBehind(false); // clear BlockReader
|
||||
fis.seek(2); // seek
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
|
|
|
@ -71,7 +71,6 @@ import org.junit.Assert;
|
|||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -269,7 +268,7 @@ public class TestShortCircuitCache {
|
|||
}
|
||||
// The last two replicas should still be cached.
|
||||
for (int i = 1; i < pairs.length; i++) {
|
||||
final Integer iVal = new Integer(i);
|
||||
final Integer iVal = i;
|
||||
replicaInfos[i] = cache.fetchOrCreate(
|
||||
new ExtendedBlockId(i, "test_bp1"),
|
||||
new ShortCircuitReplicaCreator() {
|
||||
|
@ -322,7 +321,7 @@ public class TestShortCircuitCache {
|
|||
};
|
||||
final long HOUR_IN_MS = 60 * 60 * 1000;
|
||||
for (int i = 0; i < pairs.length; i++) {
|
||||
final Integer iVal = new Integer(i);
|
||||
final Integer iVal = i;
|
||||
final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1");
|
||||
replicaInfos[i] = cache.fetchOrCreate(key,
|
||||
new ShortCircuitReplicaCreator() {
|
||||
|
|
|
@ -94,17 +94,17 @@ public class TestShortCircuitLocalRead {
|
|||
public void before() {
|
||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||
}
|
||||
|
||||
|
||||
static final long seed = 0xDEADBEEFL;
|
||||
static final int blockSize = 5120;
|
||||
final boolean simulatedStorage = false;
|
||||
|
||||
|
||||
// creates a file but does not close it
|
||||
static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
|
||||
throws IOException {
|
||||
FSDataOutputStream stm = fileSys.create(name, true,
|
||||
fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
||||
(short)repl, blockSize);
|
||||
fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
||||
(short)repl, blockSize);
|
||||
return stm;
|
||||
}
|
||||
|
||||
|
@ -112,19 +112,20 @@ public class TestShortCircuitLocalRead {
|
|||
String message) {
|
||||
checkData(actual, from, expected, actual.length, message);
|
||||
}
|
||||
|
||||
static private void checkData(byte[] actual, int from, byte[] expected, int len,
|
||||
String message) {
|
||||
|
||||
static private void checkData(byte[] actual, int from, byte[] expected,
|
||||
int len, String message) {
|
||||
for (int idx = 0; idx < len; idx++) {
|
||||
if (expected[from + idx] != actual[idx]) {
|
||||
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
|
||||
+ expected[from + idx] + " actual " + actual[idx] +
|
||||
"\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
|
||||
Assert.fail(message + " byte " + (from + idx) + " differs. expected " +
|
||||
expected[from + idx] + " actual " + actual[idx] +
|
||||
"\nexpected: " +
|
||||
StringUtils.byteToHexString(expected, from, from + len) +
|
||||
"\nactual: " + StringUtils.byteToHexString(actual, 0, len));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static String getCurrentUser() throws IOException {
|
||||
return UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
}
|
||||
|
@ -140,7 +141,7 @@ public class TestShortCircuitLocalRead {
|
|||
if (legacyShortCircuitFails) {
|
||||
assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
|
||||
}
|
||||
|
||||
|
||||
FSDataInputStream stm = fs.open(name);
|
||||
byte[] actual = new byte[expected.length-readOffset];
|
||||
stm.readFully(readOffset, actual);
|
||||
|
@ -165,7 +166,7 @@ public class TestShortCircuitLocalRead {
|
|||
nread += nbytes;
|
||||
}
|
||||
checkData(actual, readOffset, expected, "Read 3");
|
||||
|
||||
|
||||
if (legacyShortCircuitFails) {
|
||||
assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
|
||||
}
|
||||
|
@ -179,7 +180,7 @@ public class TestShortCircuitLocalRead {
|
|||
alt.get(arr);
|
||||
return arr;
|
||||
}
|
||||
|
||||
|
||||
/** Check the file content, reading as user {@code readingUser} */
|
||||
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
|
||||
int readOffset, String readingUser, Configuration conf,
|
||||
|
@ -191,7 +192,7 @@ public class TestShortCircuitLocalRead {
|
|||
if (legacyShortCircuitFails) {
|
||||
assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
|
||||
}
|
||||
|
||||
|
||||
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
||||
|
||||
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
||||
|
@ -239,7 +240,7 @@ public class TestShortCircuitLocalRead {
|
|||
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
|
||||
null, getCurrentUser(), false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that file data can be read by reading the block file
|
||||
* directly from the local store.
|
||||
|
@ -272,15 +273,15 @@ public class TestShortCircuitLocalRead {
|
|||
try {
|
||||
// check that / exists
|
||||
Path path = new Path("/");
|
||||
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
||||
.isDirectory() == true);
|
||||
|
||||
assertTrue("/ should be a directory",
|
||||
fs.getFileStatus(path).isDirectory());
|
||||
|
||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
||||
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
|
||||
FSDataOutputStream stm = createFile(fs, file1, 1);
|
||||
stm.write(fileData);
|
||||
stm.close();
|
||||
|
||||
|
||||
URI uri = cluster.getURI();
|
||||
checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
|
||||
legacyShortCircuitFails);
|
||||
|
@ -301,7 +302,7 @@ public class TestShortCircuitLocalRead {
|
|||
public void testFileLocalReadChecksum() throws Exception {
|
||||
doTestShortCircuitRead(false, 3*blockSize+100, 0);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testSmallFileLocalRead() throws Exception {
|
||||
doTestShortCircuitRead(false, 13, 0);
|
||||
|
@ -309,7 +310,7 @@ public class TestShortCircuitLocalRead {
|
|||
doTestShortCircuitRead(true, 13, 0);
|
||||
doTestShortCircuitRead(true, 13, 5);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testLocalReadLegacy() throws Exception {
|
||||
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
|
||||
|
@ -325,13 +326,13 @@ public class TestShortCircuitLocalRead {
|
|||
public void testLocalReadFallback() throws Exception {
|
||||
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testReadFromAnOffset() throws Exception {
|
||||
doTestShortCircuitRead(false, 3*blockSize+100, 777);
|
||||
doTestShortCircuitRead(true, 3*blockSize+100, 777);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testLongFile() throws Exception {
|
||||
doTestShortCircuitRead(false, 10*blockSize+100, 777);
|
||||
|
@ -348,7 +349,7 @@ public class TestShortCircuitLocalRead {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testDeprecatedGetBlockLocalPathInfoRpc() throws IOException {
|
||||
final Configuration conf = new Configuration();
|
||||
|
@ -399,18 +400,18 @@ public class TestShortCircuitLocalRead {
|
|||
try {
|
||||
// check that / exists
|
||||
Path path = new Path("/");
|
||||
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
||||
.isDirectory() == true);
|
||||
|
||||
assertTrue("/ should be a directory",
|
||||
fs.getFileStatus(path).isDirectory());
|
||||
|
||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size*3);
|
||||
// create a new file in home directory. Do not close it.
|
||||
Path file1 = new Path("filelocal.dat");
|
||||
FSDataOutputStream stm = createFile(fs, file1, 1);
|
||||
|
||||
|
||||
// write to file
|
||||
stm.write(fileData);
|
||||
stm.close();
|
||||
|
||||
|
||||
// now test the skip function
|
||||
FSDataInputStream instm = fs.open(file1);
|
||||
byte[] actual = new byte[fileData.length];
|
||||
|
@ -421,7 +422,6 @@ public class TestShortCircuitLocalRead {
|
|||
instm.seek(skipped);
|
||||
nread = instm.read(actual, (int)(skipped + nread), 3);
|
||||
instm.close();
|
||||
|
||||
} finally {
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
@ -443,7 +443,7 @@ public class TestShortCircuitLocalRead {
|
|||
final long RANDOM_SEED2 = 4568L;
|
||||
FSDataInputStream fsIn = null;
|
||||
final int TEST_LENGTH = 3456;
|
||||
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
@ -470,14 +470,11 @@ public class TestShortCircuitLocalRead {
|
|||
File dataFile = cluster.getBlockFile(0, block);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(dataFile, "rw");
|
||||
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
|
||||
raf.setLength(0);
|
||||
} finally {
|
||||
if (raf != null) raf.close();
|
||||
}
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
fsIn = fs.open(TEST_PATH);
|
||||
|
@ -509,7 +506,7 @@ public class TestShortCircuitLocalRead {
|
|||
if (cluster != null) cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test to run benchmarks between short circuit read vs regular read with
|
||||
* specified number of threads simultaneously reading.
|
||||
|
@ -535,16 +532,16 @@ public class TestShortCircuitLocalRead {
|
|||
"/tmp/TestShortCircuitLocalRead._PORT");
|
||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||
checksum);
|
||||
|
||||
//Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
||||
|
||||
// Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
||||
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
|
||||
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
|
||||
|
||||
|
||||
// create a new file in home directory. Do not close it.
|
||||
final Path file1 = new Path("filelocal.dat");
|
||||
final FileSystem fs = FileSystem.get(conf);
|
||||
FSDataOutputStream stm = createFile(fs, file1, 1);
|
||||
|
||||
|
||||
stm.write(dataToWrite);
|
||||
stm.close();
|
||||
|
||||
|
@ -580,8 +577,10 @@ public class TestShortCircuitLocalRead {
|
|||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testReadWithRemoteBlockReader() throws IOException, InterruptedException {
|
||||
doTestShortCircuitReadWithRemoteBlockReader(true, 3*blockSize+100, getCurrentUser(), 0, false);
|
||||
public void testReadWithRemoteBlockReader()
|
||||
throws IOException, InterruptedException {
|
||||
doTestShortCircuitReadWithRemoteBlockReader(true, 3 * blockSize + 100,
|
||||
getCurrentUser(), 0, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -589,8 +588,9 @@ public class TestShortCircuitLocalRead {
|
|||
* through RemoteBlockReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum, int size, String shortCircuitUser,
|
||||
int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
|
||||
public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum,
|
||||
int size, String shortCircuitUser, int readOffset,
|
||||
boolean shortCircuitFails) throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||
|
@ -601,8 +601,7 @@ public class TestShortCircuitLocalRead {
|
|||
// check that / exists
|
||||
Path path = new Path("/");
|
||||
URI uri = cluster.getURI();
|
||||
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
||||
.isDirectory() == true);
|
||||
assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory());
|
||||
|
||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
||||
Path file1 = new Path("filelocal.dat");
|
||||
|
@ -627,7 +626,7 @@ public class TestShortCircuitLocalRead {
|
|||
}
|
||||
|
||||
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
||||
byte[] expected, int readOffset) throws IOException {
|
||||
byte[] expected, int readOffset) throws IOException {
|
||||
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
||||
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
||||
IOUtils.skipFully(stm, readOffset);
|
||||
|
@ -639,5 +638,4 @@ public class TestShortCircuitLocalRead {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue