HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream re-fetch token when expired. Contributed by Walter Su.

This commit is contained in:
Jing Zhao 2015-07-24 13:52:50 -07:00
parent c2c26e6ea7
commit 95b499a367
8 changed files with 159 additions and 154 deletions

View File

@ -373,3 +373,6 @@
HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks. HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks.
(Walter Su via jing9) (Walter Su via jing9)
HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream
re-fetch token when expired. (Walter Su via jing9)

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -44,7 +43,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -206,44 +204,6 @@ public class DFSStripedInputStream extends DFSInputStream {
currentLocatedBlock = targetBlockGroup; currentLocatedBlock = targetBlockGroup;
} }
/**
* @throws IOException only when failing to refetch block token, which happens
* when this client cannot get located block information from NameNode. This
* method returns null instead of throwing exception when failing to connect
* to the DataNode.
*/
private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode, long offsetInFile,
ReaderRetryPolicy retry) throws IOException {
// only need to get a new access token or a new encryption key once
while (true) {
try {
return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
storageType, datanode);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, targetAddr)) {
fetchBlockAt(offsetInFile);
retry.refetchToken();
} else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue.", e);
// Put chosen node into dead list, continue
addToDeadNodes(datanode);
return null;
}
}
}
}
/** /**
* Extend the super method with the logic of switching between cells. * Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it * When reaching the end of a cell, proceed to the next cell and read it
@ -293,13 +253,13 @@ public class DFSStripedInputStream extends DFSInputStream {
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
- (stripeIndex * stripeLen), stripeLen); - (stripeIndex * stripeLen), stripeLen);
curStripeRange = new StripeRange(offsetInBlockGroup, StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
stripeLimit - stripeBufOffset); stripeLimit - stripeBufOffset);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
blockGroup, offsetInBlockGroup, blockGroup, offsetInBlockGroup,
offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf); offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum); blockGroup, cellSize, dataBlkNum, parityBlkNum);
// read the whole stripe // read the whole stripe
@ -311,6 +271,7 @@ public class DFSStripedInputStream extends DFSInputStream {
} }
curStripeBuf.position(stripeBufOffset); curStripeBuf.position(stripeBufOffset);
curStripeBuf.limit(stripeLimit); curStripeBuf.limit(stripeLimit);
curStripeRange = stripeRange;
} }
private Callable<Void> readCells(final BlockReader reader, private Callable<Void> readCells(final BlockReader reader,
@ -423,7 +384,6 @@ public class DFSStripedInputStream extends DFSInputStream {
} }
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
failures = 0;
if (pos < getFileLength()) { if (pos < getFileLength()) {
try { try {
if (pos > blockEnd) { if (pos > blockEnd) {
@ -623,13 +583,46 @@ public class DFSStripedInputStream extends DFSInputStream {
boolean createBlockReader(LocatedBlock block, int chunkIndex) boolean createBlockReader(LocatedBlock block, int chunkIndex)
throws IOException { throws IOException {
DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null); BlockReader reader = null;
if (dnInfo != null) { final ReaderRetryPolicy retry = new ReaderRetryPolicy();
BlockReader reader = getBlockReaderWithRetry(block, DNAddrPair dnInfo = new DNAddrPair(null, null, null);
alignedStripe.getOffsetInBlock(),
while(true) {
try {
// the cached block location might have been re-fetched, so always
// get it from cache.
block = refreshLocatedBlock(block);
targetBlocks[chunkIndex] = block;
// internal block has one location, just rule out the deadNodes
dnInfo = getBestNodeDNAddrPair(block, null);
if (dnInfo == null) {
break;
}
reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
block.getBlockSize() - alignedStripe.getOffsetInBlock(), block.getBlockSize() - alignedStripe.getOffsetInBlock(),
dnInfo.addr, dnInfo.storageType, dnInfo.info, dnInfo.addr, dnInfo.storageType, dnInfo.info);
block.getStartOffset(), new ReaderRetryPolicy()); } catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + dnInfo.addr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, dnInfo.addr)) {
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
//TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToDeadNodes(dnInfo.info);
}
}
if (reader != null) { if (reader != null) {
readerInfos[chunkIndex] = new BlockReaderInfo(reader, block, readerInfos[chunkIndex] = new BlockReaderInfo(reader, block,
dnInfo.info, alignedStripe.getOffsetInBlock()); dnInfo.info, alignedStripe.getOffsetInBlock());

View File

@ -195,12 +195,15 @@ public class StripedDataStreamer extends DataStreamer {
final ExtendedBlock bg = coordinator.getBlockGroup(); final ExtendedBlock bg = coordinator.getBlockGroup();
final LocatedBlock updated = callUpdateBlockForPipeline(bg); final LocatedBlock updated = callUpdateBlockForPipeline(bg);
final long newGS = updated.getBlock().getGenerationStamp(); final long newGS = updated.getBlock().getGenerationStamp();
final LocatedBlock[] updatedBlks = StripedBlockUtil
.parseStripedBlockGroup((LocatedStripedBlock) updated,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
if (bi != null) { if (bi != null) {
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
null, null, null, -1, updated.isCorrupt(), null); null, null, null, -1, updated.isCorrupt(), null);
lb.setBlockToken(updated.getBlockToken()); lb.setBlockToken(updatedBlks[i].getBlockToken());
newBlocks.offer(i, lb); newBlocks.offer(i, lb);
} else { } else {
final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);

View File

@ -119,8 +119,8 @@ public class StripedBlockUtil {
bg.getStartOffset(), bg.isCorrupt(), null); bg.getStartOffset(), bg.isCorrupt(), null);
} }
Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
if (idxInBlockGroup < blockTokens.length) { if (idxInReturnedLocs < blockTokens.length) {
locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]); locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]);
} }
return locatedBlock; return locatedBlock;
} }

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;

View File

@ -25,22 +25,27 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -63,17 +68,13 @@ public class TestDFSStripedOutputStreamWithFailure {
private static final int FLUSH_POS private static final int FLUSH_POS
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
private final HdfsConfiguration conf = new HdfsConfiguration();
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem dfs; private DistributedFileSystem dfs;
private final Path dir = new Path("/" private final Path dir = new Path("/"
+ TestDFSStripedOutputStreamWithFailure.class.getSimpleName()); + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
private void setup(Configuration conf) throws IOException {
@Before
public void setup() throws IOException {
final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive(); cluster.waitActive();
dfs = cluster.getFileSystem(); dfs = cluster.getFileSystem();
@ -81,8 +82,7 @@ public class TestDFSStripedOutputStreamWithFailure {
dfs.createErasureCodingZone(dir, null, 0); dfs.createErasureCodingZone(dir, null, 0);
} }
@After private void tearDown() {
public void tearDown() {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
@ -92,89 +92,76 @@ public class TestDFSStripedOutputStreamWithFailure {
return (byte)pos; return (byte)pos;
} }
@Test(timeout=120000) private void initConf(Configuration conf){
public void testDatanodeFailure0() { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
final int dn = 0;
runTest("file" + dn, length, dn);
} }
@Test(timeout=120000) private void initConfWithBlockToken(Configuration conf) {
public void testDatanodeFailure1() { conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); conf.setInt("ipc.client.connect.max.retries", 0);
final int dn = 1; // Set short retry timeouts so this test runs faster
runTest("file" + dn, length, dn); conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
} }
@Test(timeout=120000) @Test(timeout=240000)
public void testDatanodeFailure2() { public void testDatanodeFailure() throws Exception {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 2; HdfsConfiguration conf = new HdfsConfiguration();
runTest("file" + dn, length, dn); initConf(conf);
} for (int dn = 0; dn < 9; dn++) {
@Test(timeout=120000)
public void testDatanodeFailure3() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 3;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure4() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 4;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure5() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 5;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure6() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 6;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure7() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 7;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure8() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 8;
runTest("file" + dn, length, dn);
}
private void runTest(final String src, final int length, final int dnIndex) {
try { try {
setup(conf);
cluster.startDataNodes(conf, 1, true, null, null); cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive(); cluster.waitActive();
runTest(new Path(dir, "file" + dn), length, length / 2, dn, false);
} catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length);
throw e;
} finally {
tearDown();
}
}
}
runTest(new Path(dir, src), length, length/2, dnIndex); @Test(timeout=240000)
} catch(Exception e) { public void testBlockTokenExpired() throws Exception {
LOG.info("FAILED", e); final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
Assert.fail(StringUtils.stringifyException(e)); HdfsConfiguration conf = new HdfsConfiguration();
initConf(conf);
initConfWithBlockToken(conf);
for (int dn = 0; dn < 9; dn += 2) {
try {
setup(conf);
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive();
runTest(new Path(dir, "file" + dn), length, length / 2, dn, true);
} catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length);
throw e;
} finally {
tearDown();
}
} }
} }
private void runTest(final Path p, final int length, final int killPos, private void runTest(final Path p, final int length, final int killPos,
final int dnIndex) throws Exception { final int dnIndex, final boolean tokenExpire) throws Exception {
LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
+ ", dnIndex=" + dnIndex); + ", dnIndex=" + dnIndex);
Preconditions.checkArgument(killPos < length); Preconditions.checkArgument(killPos < length);
Preconditions.checkArgument(killPos > FLUSH_POS); Preconditions.checkArgument(killPos > FLUSH_POS);
final String fullPath = p.toString(); final String fullPath = p.toString();
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
if (tokenExpire) {
// set a short token lifetime (1 second)
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
}
final AtomicInteger pos = new AtomicInteger(); final AtomicInteger pos = new AtomicInteger();
final FSDataOutputStream out = dfs.create(p); final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut final DFSStripedOutputStream stripedOut
@ -189,6 +176,11 @@ public class TestDFSStripedOutputStreamWithFailure {
Assert.assertTrue(oldGS != -1); Assert.assertTrue(oldGS != -1);
Assert.assertEquals(oldGS, gs); Assert.assertEquals(oldGS, gs);
if (tokenExpire) {
DFSTestUtil.flushInternal(stripedOut);
waitTokenExpires(out);
}
killDatanode(cluster, stripedOut, dnIndex, pos); killDatanode(cluster, stripedOut, dnIndex, pos);
killed = true; killed = true;
} }
@ -348,4 +340,14 @@ public class TestDFSStripedOutputStreamWithFailure {
killedDnIndex - dataBlockBytes.length); killedDnIndex - dataBlockBytes.length);
} }
} }
private void waitTokenExpires(FSDataOutputStream out) throws IOException {
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
while (!SecurityTestUtil.isBlockTokenExpired(token)) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
} }

View File

@ -1469,10 +1469,19 @@ public class TestBalancer {
} }
} }
public void integrationTestWithStripedFile(Configuration conf) throws Exception {
initConfWithStripe(conf);
doTestBalancerWithStripedFile(conf);
}
@Test(timeout = 100000) @Test(timeout = 100000)
public void testBalancerWithStripedFile() throws Exception { public void testBalancerWithStripedFile() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
initConfWithStripe(conf); initConfWithStripe(conf);
doTestBalancerWithStripedFile(conf);
}
private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {
int numOfDatanodes = dataBlocks + parityBlocks + 2; int numOfDatanodes = dataBlocks + parityBlocks + 2;
int numOfRacks = dataBlocks; int numOfRacks = dataBlocks;
long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE; long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;

View File

@ -20,13 +20,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -46,22 +44,6 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
FILE_SIZE = BLOCK_SIZE * dataBlocks * 3; FILE_SIZE = BLOCK_SIZE * dataBlocks * 3;
} }
@Before
public void setup() throws IOException {
conf = getConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient()
.createErasureCodingZone("/", null, cellSize);
cluster.waitActive();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private Configuration getConf() { private Configuration getConf() {
Configuration conf = super.getConf(numDNs); Configuration conf = super.getConf(numDNs);
conf.setInt("io.bytes.per.checksum", cellSize); conf.setInt("io.bytes.per.checksum", cellSize);
@ -71,14 +53,26 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
@Test @Test
@Override @Override
public void testRead() throws Exception { public void testRead() throws Exception {
//TODO: DFSStripedInputStream handles token expiration conf = getConf();
// doTestRead(conf, cluster, true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient()
.createErasureCodingZone("/", null, cellSize);
try {
cluster.waitActive();
doTestRead(conf, cluster, true);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
/**
* tested at {@link org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure#testBlockTokenExpired()}
*/
@Test @Test
@Override @Override
public void testWrite() throws Exception { public void testWrite(){
//TODO: DFSStripedOutputStream handles token expiration
} }
@Test @Test
@ -90,7 +84,9 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
@Test @Test
@Override @Override
public void testEnd2End() throws Exception { public void testEnd2End() throws Exception {
//TODO: DFSStripedOutputStream handles token expiration Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
new TestBalancer().integrationTestWithStripedFile(conf);
} }
@Override @Override