HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-05-25 12:12:27 -07:00
parent 0ac8c098e8
commit e5dab68066
29 changed files with 164 additions and 113 deletions

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
@ -34,7 +34,15 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.ReplicaAccessor;
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
@ -646,7 +654,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
}
/**
* Get a RemoteBlockReader that communicates over a UNIX domain socket.
* Get a BlockReaderRemote that communicates over a UNIX domain socket.
*
* @return The new BlockReader, or null if we failed to create the block
* reader.
@ -709,7 +717,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
}
/**
* Get a RemoteBlockReader that communicates over a TCP socket.
* Get a BlockReaderRemote that communicates over a TCP socket.
*
* @return The new BlockReader. We will not return null, but instead throw
* an exception if this fails.
@ -837,13 +845,13 @@ private static boolean isSecurityException(IOException ioe) {
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
int networkDistance = clientContext.getNetworkDistance(datanode);
if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
return RemoteBlockReader.newBlockReader(fileName,
return BlockReaderRemote.newBlockReader(fileName,
block, token, startOffset, length, conf.getIoBufferSize(),
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy, tracer,
networkDistance);
} else {
return RemoteBlockReader2.newBlockReader(
return BlockReaderRemote2.newBlockReader(
fileName, block, token, startOffset, length,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy, tracer,

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import java.io.DataInputStream;
import java.io.File;
@ -33,8 +33,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -54,13 +56,13 @@
/**
* @deprecated this is an old implementation that is being left around
* in case any issues spring up with the new {@link RemoteBlockReader2}
* in case any issues spring up with the new {@link BlockReaderRemote2}
* implementation.
* It will be removed in the next release.
*/
@InterfaceAudience.Private
@Deprecated
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public class BlockReaderRemote extends FSInputChecker implements BlockReader {
static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
private final Peer peer;
@ -209,7 +211,7 @@ protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
throws IOException {
try (TraceScope ignored = tracer.newScope(
"RemoteBlockReader#readChunk(" + blockId + ")")) {
"BlockReaderRemote#readChunk(" + blockId + ")")) {
return readChunkImpl(pos, buf, offset, len, checksumBuf);
}
}
@ -335,7 +337,7 @@ private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
return bytesToRead;
}
private RemoteBlockReader(String file, String bpid, long blockId,
private BlockReaderRemote(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
@ -386,7 +388,7 @@ private RemoteBlockReader(String file, String bpid, long blockId,
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static RemoteBlockReader newBlockReader(String file,
public static BlockReaderRemote newBlockReader(String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
@ -412,7 +414,7 @@ public static RemoteBlockReader newBlockReader(String file,
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, peer, block, file);
BlockReaderRemote2.checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
@ -429,7 +431,7 @@ public static RemoteBlockReader newBlockReader(String file,
startOffset + " for file " + file);
}
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
peer, datanodeID, peerCache, tracer, networkDistance);
}
@ -467,7 +469,7 @@ public int readAll(byte[] buf, int offset, int len) throws IOException {
void sendReadResult(Peer peer, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + peer;
try {
RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
@ -478,14 +480,14 @@ void sendReadResult(Peer peer, Status statusCode) {
@Override
public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote");
}
@Override
public int available() {
// An optimistic estimate of how much data is available
// to us without doing network I/O.
return RemoteBlockReader2.TCP_WINDOW_SIZE;
return BlockReaderRemote2.TCP_WINDOW_SIZE;
}
@Override

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@ -30,6 +30,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -78,13 +80,13 @@
*
* This is a new implementation introduced in Hadoop 0.23 which
* is more efficient and simpler than the older BlockReader
* implementation. It should be renamed to RemoteBlockReader
* implementation. It should be renamed to BlockReaderRemote
* once we are confident in it.
*/
@InterfaceAudience.Private
public class RemoteBlockReader2 implements BlockReader {
public class BlockReaderRemote2 implements BlockReader {
static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class);
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
final private Peer peer;
@ -138,7 +140,7 @@ public synchronized int read(byte[] buf, int off, int len)
if (curDataSlice == null ||
curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
try (TraceScope ignored = tracer.newScope(
"RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
readNextPacket();
}
}
@ -162,7 +164,7 @@ public synchronized int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null ||
(curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
try (TraceScope ignored = tracer.newScope(
"RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
readNextPacket();
}
}
@ -273,7 +275,7 @@ private void readTrailingEmptyPacket() throws IOException {
}
}
protected RemoteBlockReader2(String file, long blockId,
protected BlockReaderRemote2(String file, long blockId,
DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
@ -425,7 +427,7 @@ public static BlockReader newBlockReader(String file,
startOffset + " for file " + file);
}
return new RemoteBlockReader2(file, block.getBlockId(), checksum,
return new BlockReaderRemote2(file, block.getBlockId(), checksum,
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
peerCache, tracer, networkDistance);
}

View File

@ -15,9 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.BlockReader;
import java.io.IOException;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.ReplicaAccessor;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
/**

View File

@ -133,7 +133,7 @@
<!-- Don't complain about LocalDatanodeInfo's anonymous class -->
<Match>
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
<Class name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" />
<Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
</Match>
<!-- Only one method increments numFailedVolumes and it is synchronized -->

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;

View File

@ -38,7 +38,7 @@
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -48,7 +48,6 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.junit.Assert;
import org.junit.Test;

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.junit.Test;
/**

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -32,6 +32,15 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -225,7 +234,7 @@ public DataNode getDataNode(LocatedBlock testBlock) {
int ipcport = nodes[0].getIpcPort();
return cluster.getDataNode(ipcport);
}
public static void enableHdfsCachingTracing() {
LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
Level.TRACE);

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.junit.Assert.assertEquals;
@ -24,6 +24,9 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.junit.After;
import org.junit.Before;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
@ -41,6 +41,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -125,7 +130,7 @@ public void testFallbackFromShortCircuitToUnixDomainTraffic()
cluster.shutdown();
sockDir.close();
}
/**
* Test the case where we have multiple threads waiting on the
* ShortCircuitCache delivering a certain ShortCircuitReplica.
@ -200,7 +205,7 @@ public void run() {
* Test the case where we have a failure to complete a short circuit read
* that occurs, and then later on, we have a success.
* Any thread waiting on a cache load should receive the failure (if it
* occurs); however, the failure result should not be cached. We want
* occurs); however, the failure result should not be cached. We want
* to be able to retry later and succeed.
*/
@Test(timeout=60000)
@ -244,7 +249,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
public void run() {
try {
// First time should fail.
List<LocatedBlock> locatedBlocks =
List<LocatedBlock> locatedBlocks =
cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
LocatedBlock lblock = locatedBlocks.get(0); // first block
@ -253,7 +258,7 @@ public void run() {
blockReader = BlockReaderTestUtil.
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
Assert.fail("expected getBlockReader to fail the first time.");
} catch (Throwable t) {
} catch (Throwable t) {
Assert.assertTrue("expected to see 'TCP reads were disabled " +
"for testing' in exception " + t, t.getMessage().contains(
"TCP reads were disabled for testing"));
@ -267,7 +272,7 @@ public void run() {
try {
blockReader = BlockReaderTestUtil.
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
} catch (Throwable t) {
} catch (Throwable t) {
LOG.error("error trying to retrieve a block reader " +
"the second time.", t);
throw t;
@ -327,7 +332,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.dfs.getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache();
final DatanodeInfo datanode =
new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
cache.getDfsClientShmManager().visit(new Visitor() {
@ -344,7 +349,7 @@ public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
cluster.shutdown();
sockDir.close();
}
/**
* Test that a client which does not support short-circuit reads using
* shared memory can talk with a server which supports it.
@ -375,12 +380,12 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.dfs.getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache();
Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown();
sockDir.close();
}
/**
* Test shutting down the ShortCircuitCache while there are things in it.
*/
@ -407,7 +412,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.dfs.getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache();
cache.close();
Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed());

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.hamcrest.CoreMatchers.equalTo;
@ -32,9 +32,15 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
@ -55,24 +61,24 @@
public class TestBlockReaderLocal {
private static TemporarySocketDirectory sockDir;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
if (buf1[off1 + i] != buf2[off2 + i]) {
Assert.fail("arrays differ at byte " + i + ". " +
"The first array has " + (int)buf1[off1 + i] +
Assert.fail("arrays differ at byte " + i + ". " +
"The first array has " + (int)buf1[off1 + i] +
", but the second array has " + (int)buf2[off2 + i]);
}
}
@ -85,7 +91,7 @@ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
* @param buf The ByteBuffer to read into
* @param off The offset in the buffer to read into
* @param len The number of bytes to read.
*
*
* @throws IOException If it could not read the requested number of bytes
*/
private static void readFully(BlockReaderLocal reader,
@ -120,7 +126,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
// default: no-op
}
}
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
@ -139,7 +145,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
BlockReaderLocal blockReaderLocal = null;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
ShortCircuitShm shm = null;
RandomAccessFile raf = null;
@ -186,7 +192,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
@ -216,21 +222,21 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
if (raf != null) raf.close();
}
}
private static class TestBlockReaderLocalImmediateClose
private static class TestBlockReaderLocalImmediateClose
extends BlockReaderLocalTest {
}
@Test
public void testBlockReaderLocalImmediateClose() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
}
private static class TestBlockReaderSimpleReads
private static class TestBlockReaderSimpleReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512);
@ -246,7 +252,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderSimpleReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
@ -275,11 +281,11 @@ public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
}
private static class TestBlockReaderLocalArrayReads2
private static class TestBlockReaderLocalArrayReads2
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 10);
@ -296,7 +302,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
}
}
@Test
public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
@ -322,10 +328,10 @@ public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
}
private static class TestBlockReaderLocalByteBufferReads
private static class TestBlockReaderLocalByteBufferReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
readFully(reader, buf, 0, 10);
@ -339,7 +345,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
@Test
public void testBlockReaderLocalByteBufferReads()
throws IOException {
@ -354,7 +360,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum()
new TestBlockReaderLocalByteBufferReads(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoReadahead()
throws IOException {
@ -373,7 +379,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
* Test reads that bypass the bounce buffer (because they are aligned
* and bigger than the readahead).
*/
private static class TestBlockReaderLocalByteBufferFastLaneReads
private static class TestBlockReaderLocalByteBufferFastLaneReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
@ -410,7 +416,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
50);
}
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReads()
throws IOException {
@ -456,7 +462,7 @@ public void setup(File blockFile, boolean usingChecksums)
}
}
public void doTest(BlockReaderLocal reader, byte original[])
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
if (usingChecksums) {
@ -471,19 +477,19 @@ public void doTest(BlockReaderLocal reader, byte original[])
}
}
}
@Test
public void testBlockReaderLocalReadCorruptStart()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
private static class TestBlockReaderLocalReadCorrupt
extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
RandomAccessFile bf = null;
this.usingChecksums = usingChecksums;
@ -496,7 +502,7 @@ public void setup(File blockFile, boolean usingChecksums)
}
}
public void doTest(BlockReaderLocal reader, byte original[])
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
try {
@ -522,7 +528,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
}
}
}
@Test
public void testBlockReaderLocalReadCorrupt()
throws IOException {
@ -555,7 +561,7 @@ private static class TestBlockReaderLocalWithMlockChanges
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
}
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@ -675,7 +681,7 @@ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, 0);
}
@Test
public void testBlockReaderLocalReadZeroBytes()
throws IOException {
@ -703,7 +709,7 @@ public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
false, 0);
}
@Test(timeout=60000)
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
@ -714,7 +720,7 @@ public void TestStatisticsForShortCircuitLocalRead() throws Exception {
public void TestStatisticsForLocalRead() throws Exception {
testStatistics(false);
}
private void testStatistics(boolean isShortCircuit) throws Exception {
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
HdfsConfiguration conf = new HdfsConfiguration();
@ -756,12 +762,12 @@ private void testStatistics(boolean isShortCircuit) throws Exception {
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalLocalBytesRead());
if (isShortCircuit) {
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
} else {
Assert.assertEquals(0,

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -30,6 +30,13 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -52,7 +59,7 @@ public static void setupCluster() throws IOException {
DFSInputStream.tcpReadsDisabledForTesting = true;
DomainSocket.disableBindPathValidation();
}
private static HdfsConfiguration getConfiguration(
TemporarySocketDirectory socketDir) throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
@ -84,7 +91,7 @@ private static HdfsConfiguration getConfiguration(
public void testStablePositionAfterCorruptRead() throws Exception {
final short REPL_FACTOR = 1;
final long FILE_LENGTH = 512L;
HdfsConfiguration conf = getConfiguration(null);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -15,11 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
public class TestRemoteBlockReader extends TestBlockReaderBase {
public class TestBlockReaderRemote extends TestBlockReaderBase {
HdfsConfiguration createConf() {
HdfsConfiguration conf = new HdfsConfiguration();

View File

@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
public class TestRemoteBlockReader2 extends TestBlockReaderBase {
import org.apache.hadoop.hdfs.HdfsConfiguration;
public class TestBlockReaderRemote2 extends TestBlockReaderBase {
HdfsConfiguration createConf() {
HdfsConfiguration conf = new HdfsConfiguration();
return conf;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.client.impl;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -25,6 +25,7 @@
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.test.GenericTestUtils;
@ -41,7 +42,7 @@ public class TestClientBlockVerification {
static LocatedBlock testBlock = null;
static {
GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL);
}
@BeforeClass
public static void setupCluster() throws Exception {
@ -57,7 +58,7 @@ public static void setupCluster() throws Exception {
*/
@Test
public void testBlockVerification() throws Exception {
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
@ -69,7 +70,7 @@ public void testBlockVerification() throws Exception {
*/
@Test
public void testIncompleteRead() throws Exception {
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
@ -87,7 +88,7 @@ public void testIncompleteRead() throws Exception {
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
@ -107,7 +108,7 @@ public void testUnalignedReads() throws Exception {
for (int length : lengths) {
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
" len=" + length);
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -59,7 +59,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;

View File

@ -55,12 +55,11 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;

View File

@ -39,8 +39,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;

View File

@ -45,7 +45,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.apache.hadoop.hdfs.client.impl.TestBlockReaderLocal;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -54,7 +54,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;