HDFS-8216. TestDFSStripedOutputStream should use BlockReaderTestUtil to create BlockReader. Contributed by Tsz Wo Nicholas Sze.
This commit is contained in:
parent
e107886d6f
commit
e9d85bbf30
|
@ -107,3 +107,6 @@
|
||||||
|
|
||||||
HDFS-8190. StripedBlockUtil.getInternalBlockLength may have overflow error.
|
HDFS-8190. StripedBlockUtil.getInternalBlockLength may have overflow error.
|
||||||
(szetszwo)
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-8216. TestDFSStripedOutputStream should use BlockReaderTestUtil to
|
||||||
|
create BlockReader. (szetszwo via Zhe Zhang)
|
||||||
|
|
|
@ -165,20 +165,19 @@ public class BlockReaderTestUtil {
|
||||||
*/
|
*/
|
||||||
public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
|
public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return getBlockReader(cluster, testBlock, offset, lenToRead);
|
return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a BlockReader for the given block.
|
* Get a BlockReader for the given block.
|
||||||
*/
|
*/
|
||||||
public static BlockReader getBlockReader(MiniDFSCluster cluster,
|
public static BlockReader getBlockReader(final DistributedFileSystem fs,
|
||||||
LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
|
LocatedBlock testBlock, int offset, long lenToRead) throws IOException {
|
||||||
InetSocketAddress targetAddr = null;
|
InetSocketAddress targetAddr = null;
|
||||||
ExtendedBlock block = testBlock.getBlock();
|
ExtendedBlock block = testBlock.getBlock();
|
||||||
DatanodeInfo[] nodes = testBlock.getLocations();
|
DatanodeInfo[] nodes = testBlock.getLocations();
|
||||||
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
|
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
|
||||||
|
|
||||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
|
||||||
return new BlockReaderFactory(fs.getClient().getConf()).
|
return new BlockReaderFactory(fs.getClient().getConf()).
|
||||||
setInetSocketAddress(targetAddr).
|
setInetSocketAddress(targetAddr).
|
||||||
setBlock(block).
|
setBlock(block).
|
||||||
|
|
|
@ -250,8 +250,8 @@ public class TestBlockReaderFactory {
|
||||||
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
||||||
BlockReader blockReader = null;
|
BlockReader blockReader = null;
|
||||||
try {
|
try {
|
||||||
blockReader = BlockReaderTestUtil.
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
Assert.fail("expected getBlockReader to fail the first time.");
|
Assert.fail("expected getBlockReader to fail the first time.");
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
Assert.assertTrue("expected to see 'TCP reads were disabled " +
|
Assert.assertTrue("expected to see 'TCP reads were disabled " +
|
||||||
|
@ -265,8 +265,8 @@ public class TestBlockReaderFactory {
|
||||||
|
|
||||||
// Second time should succeed.
|
// Second time should succeed.
|
||||||
try {
|
try {
|
||||||
blockReader = BlockReaderTestUtil.
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("error trying to retrieve a block reader " +
|
LOG.error("error trying to retrieve a block reader " +
|
||||||
"the second time.", t);
|
"the second time.", t);
|
||||||
|
@ -474,8 +474,8 @@ public class TestBlockReaderFactory {
|
||||||
while (true) {
|
while (true) {
|
||||||
BlockReader blockReader = null;
|
BlockReader blockReader = null;
|
||||||
try {
|
try {
|
||||||
blockReader = BlockReaderTestUtil.
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
sem.release();
|
sem.release();
|
||||||
try {
|
try {
|
||||||
blockReader.readAll(buf, 0, TEST_FILE_LEN);
|
blockReader.readAll(buf, 0, TEST_FILE_LEN);
|
||||||
|
@ -514,8 +514,8 @@ public class TestBlockReaderFactory {
|
||||||
// getting a ClosedChannelException.
|
// getting a ClosedChannelException.
|
||||||
BlockReader blockReader = null;
|
BlockReader blockReader = null;
|
||||||
try {
|
try {
|
||||||
blockReader = BlockReaderTestUtil.
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
blockReader.readFully(buf, 0, TEST_FILE_LEN);
|
blockReader.readFully(buf, 0, TEST_FILE_LEN);
|
||||||
} finally {
|
} finally {
|
||||||
if (blockReader != null) blockReader.close();
|
if (blockReader != null) blockReader.close();
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -29,25 +27,14 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
|
||||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -59,7 +46,6 @@ public class TestDFSStripedOutputStream {
|
||||||
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
|
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
|
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private Configuration conf = new Configuration();
|
|
||||||
private DistributedFileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
private final int stripesPerBlock = 4;
|
private final int stripesPerBlock = 4;
|
||||||
|
@ -174,6 +160,10 @@ public class TestDFSStripedOutputStream {
|
||||||
FileStatus status = fs.getFileStatus(testPath);
|
FileStatus status = fs.getFileStatus(testPath);
|
||||||
Assert.assertEquals(writeBytes, status.getLen());
|
Assert.assertEquals(writeBytes, status.getLen());
|
||||||
|
|
||||||
|
checkData(src, writeBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkData(String src, int writeBytes) throws IOException {
|
||||||
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
|
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
|
||||||
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
|
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
|
||||||
|
|
||||||
|
@ -199,11 +189,7 @@ public class TestDFSStripedOutputStream {
|
||||||
if (lblock == null) {
|
if (lblock == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
DatanodeInfo[] nodes = lblock.getLocations();
|
|
||||||
ExtendedBlock block = lblock.getBlock();
|
ExtendedBlock block = lblock.getBlock();
|
||||||
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
|
|
||||||
nodes[0].getXferAddr());
|
|
||||||
|
|
||||||
byte[] blockBytes = new byte[(int)block.getNumBytes()];
|
byte[] blockBytes = new byte[(int)block.getNumBytes()];
|
||||||
if (i < dataBlocks) {
|
if (i < dataBlocks) {
|
||||||
dataBlockBytes[i] = blockBytes;
|
dataBlockBytes[i] = blockBytes;
|
||||||
|
@ -215,40 +201,8 @@ public class TestDFSStripedOutputStream {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
|
final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
setFileName(src).
|
fs, lblock, 0, block.getNumBytes());
|
||||||
setBlock(block).
|
|
||||||
setBlockToken(lblock.getBlockToken()).
|
|
||||||
setInetSocketAddress(targetAddr).
|
|
||||||
setStartOffset(0).
|
|
||||||
setLength(block.getNumBytes()).
|
|
||||||
setVerifyChecksum(true).
|
|
||||||
setClientName("TestStripeLayoutWrite").
|
|
||||||
setDatanodeInfo(nodes[0]).
|
|
||||||
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
|
||||||
setClientCacheContext(ClientContext.getFromConf(conf)).
|
|
||||||
setConfiguration(conf).
|
|
||||||
setRemotePeerFactory(new RemotePeerFactory() {
|
|
||||||
@Override
|
|
||||||
public Peer newConnectedPeer(InetSocketAddress addr,
|
|
||||||
Token<BlockTokenIdentifier> blockToken,
|
|
||||||
DatanodeID datanodeId)
|
|
||||||
throws IOException {
|
|
||||||
Peer peer = null;
|
|
||||||
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
|
|
||||||
try {
|
|
||||||
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
|
|
||||||
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
|
||||||
peer = TcpPeerServer.peerFromSocket(sock);
|
|
||||||
} finally {
|
|
||||||
if (peer == null) {
|
|
||||||
IOUtils.closeSocket(sock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return peer;
|
|
||||||
}
|
|
||||||
}).build();
|
|
||||||
|
|
||||||
blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
|
blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
|
||||||
blockReader.close();
|
blockReader.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue