HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid explicitly creating Files in the tests code. (lei)

(cherry picked from commit 71e533a153)
This commit is contained in:
Lei Xu 2015-10-20 10:08:02 -07:00
parent 03bb40612e
commit 7a8ed154ac
5 changed files with 231 additions and 118 deletions

View File

@ -709,6 +709,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc. HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc.
(Xiao Chen via wang) (Xiao Chen via wang)
HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid
explicitly creating Files in the tests code. (lei)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -137,4 +138,58 @@ public interface FsDatasetTestUtils {
*/ */
MaterializedReplica getMaterializedReplica(ExtendedBlock block) MaterializedReplica getMaterializedReplica(ExtendedBlock block)
throws ReplicaNotFoundException; throws ReplicaNotFoundException;
/**
* Create a finalized replica and add it into the FsDataset.
*/
Replica createFinalizedReplica(ExtendedBlock block) throws IOException;
/**
* Create a finalized replica on a particular volume, and add it into
* the FsDataset.
*/
Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
throws IOException;
/**
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
*/
Replica createReplicaInPipeline(ExtendedBlock block) throws IOException;
/**
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
*/
Replica createReplicaInPipeline(FsVolumeSpi volume, ExtendedBlock block)
throws IOException;
/**
* Create a {@link ReplicaBeingWritten} and add it into the FsDataset.
*/
Replica createRBW(ExtendedBlock block) throws IOException;
/**
* Create a {@link ReplicaBeingWritten} on the particular volume, and add it
* into the FsDataset.
*/
Replica createRBW(FsVolumeSpi volume, ExtendedBlock block) throws IOException;
/**
* Create a {@link ReplicaWaitingToBeRecovered} object and add it into the
* FsDataset.
*/
Replica createReplicaWaitingToBeRecovered(ExtendedBlock block)
throws IOException;
/**
* Create a {@link ReplicaWaitingToBeRecovered} on the particular volume,
* and add it into the FsDataset.
*/
Replica createReplicaWaitingToBeRecovered(
FsVolumeSpi volume, ExtendedBlock block) throws IOException;
/**
* Create a {@link ReplicaUnderRecovery} object and add it into the FsDataset.
*/
Replica createReplicaUnderRecovery(ExtendedBlock block, long recoveryId)
throws IOException;
} }

View File

@ -23,10 +23,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -176,4 +186,103 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
blockFile, block.getGenerationStamp()); blockFile, block.getGenerationStamp());
return new FsDatasetImplMaterializedReplica(blockFile, metaFile); return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
} }
@Override
public Replica createFinalizedReplica(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createFinalizedReplica(volumes.get(0), block);
}
}
@Override
public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
vol.getCurrentDir().getParentFile());
dataset.volumeMap.add(block.getBlockPoolId(), info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
return info;
}
@Override
public Replica createReplicaInPipeline(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), block);
}
}
@Override
public Replica createReplicaInPipeline(
FsVolumeSpi volume, ExtendedBlock block) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInPipeline rip = new ReplicaInPipeline(
block.getBlockId(), block.getGenerationStamp(), volume,
vol.createTmpFile(
block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
0);
dataset.volumeMap.add(block.getBlockPoolId(), rip);
return rip;
}
@Override
public Replica createRBW(ExtendedBlock eb) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createRBW(volumes.get(0), eb);
}
}
@Override
public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaBeingWritten rbw = new ReplicaBeingWritten(
eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile(), null);
rbw.getBlockFile().createNewFile();
rbw.getMetaFile().createNewFile();
dataset.volumeMap.add(bpid, rbw);
return rbw;
}
@Override
public Replica createReplicaWaitingToBeRecovered(ExtendedBlock eb)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), eb);
}
}
@Override
public Replica createReplicaWaitingToBeRecovered(
FsVolumeSpi volume, ExtendedBlock eb) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaWaitingToBeRecovered rwbr =
new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile());
dataset.volumeMap.add(bpid, rwbr);
return rwbr;
}
@Override
public Replica createReplicaUnderRecovery(
ExtendedBlock block, long recoveryId) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica(
block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()),
recoveryId
);
dataset.volumeMap.add(block.getBlockPoolId(), rur);
return rur;
}
}
} }

View File

@ -365,12 +365,14 @@ public class TestFsDatasetImpl {
@Test @Test
public void testDeletingBlocks() throws IOException { public void testDeletingBlocks() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
cluster.waitActive(); cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
ds.addBlockPool(BLOCKPOOL, conf);
FsVolumeImpl vol; FsVolumeImpl vol;
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) { try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
vol = (FsVolumeImpl)volumes.get(0); vol = (FsVolumeImpl)volumes.get(0);
@ -378,15 +380,11 @@ public class TestFsDatasetImpl {
ExtendedBlock eb; ExtendedBlock eb;
ReplicaInfo info; ReplicaInfo info;
List<Block> blockList = new ArrayList<Block>(); List<Block> blockList = new ArrayList<>();
for (int i = 1; i <= 63; i++) { for (int i = 1; i <= 63; i++) {
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
info = new FinalizedReplica( cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); blockList.add(eb.getLocalBlock());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
} }
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try { try {
@ -398,12 +396,8 @@ public class TestFsDatasetImpl {
blockList.clear(); blockList.clear();
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
info = new FinalizedReplica( cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); blockList.add(eb.getLocalBlock());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try { try {
Thread.sleep(1000); Thread.sleep(1000);

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -35,16 +34,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -70,12 +64,12 @@ public class TestWriteToReplica {
try { try {
cluster.waitActive(); cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap // set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test close // test close
testClose(dataSet, blocks); testClose(dataSet, blocks);
@ -91,11 +85,11 @@ public class TestWriteToReplica {
try { try {
cluster.waitActive(); cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap // set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test append // test append
testAppend(bpid, dataSet, blocks); testAppend(bpid, dataSet, blocks);
@ -115,7 +109,7 @@ public class TestWriteToReplica {
// set up replicasMap // set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test writeToRbw // test writeToRbw
testWriteToRbw(dataSet, blocks); testWriteToRbw(dataSet, blocks);
@ -135,7 +129,7 @@ public class TestWriteToReplica {
// set up replicasMap // set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test writeToTemporary // test writeToTemporary
testWriteToTemporary(dataSet, blocks); testWriteToTemporary(dataSet, blocks);
@ -149,11 +143,12 @@ public class TestWriteToReplica {
* on which to run the tests. * on which to run the tests.
* *
* @param bpid Block pool ID to generate blocks for * @param bpid Block pool ID to generate blocks for
* @param dataSet Namespace in which to insert blocks * @param testUtils FsDatasetTestUtils provides white box access to FsDataset.
* @return Contrived blocks for further testing. * @return Contrived blocks for further testing.
* @throws IOException * @throws IOException
*/ */
private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOException { private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
throws IOException {
// setup replicas map // setup replicas map
ExtendedBlock[] blocks = new ExtendedBlock[] { ExtendedBlock[] blocks = new ExtendedBlock[] {
@ -161,59 +156,36 @@ public class TestWriteToReplica {
new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004), new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006) new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
}; };
ReplicaMap replicasMap = dataSet.volumeMap;
try (FsDatasetSpi.FsVolumeReferences references =
dataSet.getFsVolumeReferences()) {
FsVolumeImpl vol = (FsVolumeImpl) references.get(0);
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol,
vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaInPipeline( testUtils.createFinalizedReplica(blocks[FINALIZED]);
blocks[TEMPORARY].getBlockId(), testUtils.createReplicaInPipeline(blocks[TEMPORARY]);
blocks[TEMPORARY].getGenerationStamp(), vol, testUtils.createRBW(blocks[RBW]);
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()) testUtils.createReplicaWaitingToBeRecovered(blocks[RWR]);
.getParentFile(), 0)); testUtils.createReplicaUnderRecovery(blocks[RUR], 2007);
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(),
null);
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap
.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()),
2007));
}
return blocks; return blocks;
} }
private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { private void testAppend(String bpid, FsDatasetSpi<?> dataSet,
ExtendedBlock[] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1; long newGS = blocks[FINALIZED].getGenerationStamp()+1;
final FsVolumeImpl v = (FsVolumeImpl)dataSet.volumeMap.get( final FsVolumeSpi v = dataSet.getVolume(blocks[FINALIZED]);
bpid, blocks[FINALIZED].getLocalBlock()).getVolume(); if (v instanceof FsVolumeImpl) {
long available = v.getCapacity()-v.getDfsUsed(); FsVolumeImpl fvi = (FsVolumeImpl) v;
long expectedLen = blocks[FINALIZED].getNumBytes(); long available = fvi.getCapacity() - fvi.getDfsUsed();
try { long expectedLen = blocks[FINALIZED].getNumBytes();
v.onBlockFileDeletion(bpid, -available); try {
blocks[FINALIZED].setNumBytes(expectedLen+100); fvi.onBlockFileDeletion(bpid, -available);
dataSet.append(blocks[FINALIZED], newGS, expectedLen); blocks[FINALIZED].setNumBytes(expectedLen + 100);
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]); dataSet.append(blocks[FINALIZED], newGS, expectedLen);
} catch (DiskOutOfSpaceException e) { Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
Assert.assertTrue(e.getMessage().startsWith( } catch (DiskOutOfSpaceException e) {
"Insufficient space for appending to ")); Assert.assertTrue(e.getMessage().startsWith(
"Insufficient space for appending to "));
}
fvi.onBlockFileDeletion(bpid, available);
blocks[FINALIZED].setNumBytes(expectedLen);
} }
v.onBlockFileDeletion(bpid, available);
blocks[FINALIZED].setNumBytes(expectedLen);
newGS = blocks[RBW].getGenerationStamp()+1; newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.append(blocks[FINALIZED], newGS, dataSet.append(blocks[FINALIZED], newGS,
@ -317,7 +289,7 @@ public class TestWriteToReplica {
} }
} }
private void testClose(FsDatasetImpl dataSet, ExtendedBlock [] blocks) throws IOException { private void testClose(FsDatasetSpi<?> dataSet, ExtendedBlock [] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1; long newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverClose(blocks[FINALIZED], newGS, dataSet.recoverClose(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful blocks[FINALIZED].getNumBytes()); // successful
@ -544,28 +516,27 @@ public class TestWriteToReplica {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils. FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
getFSDataset(dn); getFSDataset(dn);
ReplicaMap replicaMap = dataSet.volumeMap;
List<FsVolumeImpl> volumes = null; List<FsVolumeSpi> volumes = null;
try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) { try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
// number of volumes should be 2 - [data1, data2] // number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, referredVols.size()); assertEquals("number of volumes is wrong", 2, referredVols.size());
volumes = new ArrayList<>(referredVols.size()); volumes = new ArrayList<>(referredVols.size());
for (FsVolumeSpi vol : referredVols) { for (FsVolumeSpi vol : referredVols) {
volumes.add((FsVolumeImpl) vol); volumes.add(vol);
} }
} }
ArrayList<String> bpList = new ArrayList<String>(Arrays.asList( ArrayList<String> bpList = new ArrayList<>(Arrays.asList(
cluster.getNamesystem(0).getBlockPoolId(), cluster.getNamesystem(0).getBlockPoolId(),
cluster.getNamesystem(1).getBlockPoolId())); cluster.getNamesystem(1).getBlockPoolId()));
Assert.assertTrue("Cluster should have 2 block pools", Assert.assertTrue("Cluster should have 2 block pools",
bpList.size() == 2); bpList.size() == 2);
createReplicas(bpList, volumes, replicaMap); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(this); ReplicaMap oldReplicaMap = new ReplicaMap(this);
oldReplicaMap.addAll(replicaMap); oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0); cluster.restartDataNode(0);
cluster.waitActive(); cluster.waitActive();
dn = cluster.getDataNodes().get(0); dn = cluster.getDataNodes().get(0);
@ -622,48 +593,29 @@ public class TestWriteToReplica {
} }
} }
private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes, private void createReplicas(List<String> bpList, List<FsVolumeSpi> volumes,
ReplicaMap volumeMap) throws IOException { FsDatasetTestUtils testUtils) throws IOException {
Assert.assertTrue("Volume map can't be null" , volumeMap != null);
// Here we create all different type of replicas and add it // Here we create all different type of replicas and add it
// to volume map. // to volume map.
// Created all type of ReplicaInfo, each under Blkpool corresponding volume // Created all type of ReplicaInfo, each under Blkpool corresponding volume
long id = 1; // This variable is used as both blockId and genStamp long id = 1; // This variable is used as both blockId and genStamp
for (String bpId: bpList) { for (String bpId: bpList) {
for (FsVolumeImpl volume: volumes) { for (FsVolumeSpi volume: volumes) {
ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume, ExtendedBlock eb = new ExtendedBlock(bpId, id, 1, id);
DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id)); testUtils.createFinalizedReplica(volume, eb);
volumeMap.add(bpId, finalizedReplica);
id++;
ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume,
volume.getRbwDir(bpId), null, 100);
volumeMap.add(bpId, rbwReplica);
id++; id++;
ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id, eb = new ExtendedBlock(bpId, id, 1, id);
volume, volume.getRbwDir(bpId)); testUtils.createRBW(volume, eb);
volumeMap.add(bpId, rwrReplica);
id++; id++;
ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume, eb = new ExtendedBlock(bpId, id, 1, id);
volume.getTmpDir(bpId), 0); testUtils.createReplicaWaitingToBeRecovered(volume, eb);
volumeMap.add(bpId, ripReplica); id++;
eb = new ExtendedBlock(bpId, id, 1, id);
testUtils.createReplicaInPipeline(volume, eb);
id++; id++;
}
}
for (String bpId: bpList) {
for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
File parentFile = replicaInfo.getBlockFile().getParentFile();
if (!parentFile.exists()) {
if (!parentFile.mkdirs()) {
throw new IOException("Failed to mkdirs " + parentFile);
}
}
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
} }
} }
} }