HDFS-12091. [READ] Check that the replicas served from a ProvidedVolumeImpl belong to the correct external storage
This commit is contained in:
parent
546b95f484
commit
663b3c08b1
|
@ -64,21 +64,25 @@ public class StorageLocation
|
||||||
this.storageType = storageType;
|
this.storageType = storageType;
|
||||||
if (uri.getScheme() == null || uri.getScheme().equals("file")) {
|
if (uri.getScheme() == null || uri.getScheme().equals("file")) {
|
||||||
// make sure all URIs that point to a file have the same scheme
|
// make sure all URIs that point to a file have the same scheme
|
||||||
try {
|
uri = normalizeFileURI(uri);
|
||||||
File uriFile = new File(uri.getPath());
|
|
||||||
String uriStr = uriFile.toURI().normalize().toString();
|
|
||||||
if (uriStr.endsWith("/")) {
|
|
||||||
uriStr = uriStr.substring(0, uriStr.length() - 1);
|
|
||||||
}
|
|
||||||
uri = new URI(uriStr);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"URI: " + uri + " is not in the expected format");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
baseURI = uri;
|
baseURI = uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static URI normalizeFileURI(URI uri) {
|
||||||
|
try {
|
||||||
|
File uriFile = new File(uri.getPath());
|
||||||
|
String uriStr = uriFile.toURI().normalize().toString();
|
||||||
|
if (uriStr.endsWith("/")) {
|
||||||
|
uriStr = uriStr.substring(0, uriStr.length() - 1);
|
||||||
|
}
|
||||||
|
return new URI(uriStr);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"URI: " + uri + " is not in the expected format");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public StorageType getStorageType() {
|
public StorageType getStorageType() {
|
||||||
return this.storageType;
|
return this.storageType;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
|
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||||
|
@ -64,7 +65,7 @@ import org.apache.hadoop.util.Time;
|
||||||
public class ProvidedVolumeImpl extends FsVolumeImpl {
|
public class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
|
|
||||||
static class ProvidedBlockPoolSlice {
|
static class ProvidedBlockPoolSlice {
|
||||||
private FsVolumeImpl providedVolume;
|
private ProvidedVolumeImpl providedVolume;
|
||||||
|
|
||||||
private FileRegionProvider provider;
|
private FileRegionProvider provider;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -89,13 +90,20 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
return provider;
|
return provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFileRegionProvider(FileRegionProvider newProvider) {
|
||||||
|
this.provider = newProvider;
|
||||||
|
}
|
||||||
|
|
||||||
public void getVolumeMap(ReplicaMap volumeMap,
|
public void getVolumeMap(ReplicaMap volumeMap,
|
||||||
RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
|
RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
|
||||||
Iterator<FileRegion> iter = provider.iterator();
|
Iterator<FileRegion> iter = provider.iterator();
|
||||||
while(iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
FileRegion region = iter.next();
|
FileRegion region = iter.next();
|
||||||
if (region.getBlockPoolId() != null &&
|
if (region.getBlockPoolId() != null
|
||||||
region.getBlockPoolId().equals(bpid)) {
|
&& region.getBlockPoolId().equals(bpid)
|
||||||
|
&& containsBlock(providedVolume.baseURI,
|
||||||
|
region.getPath().toUri())) {
|
||||||
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
|
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
|
||||||
.setBlockId(region.getBlock().getBlockId())
|
.setBlockId(region.getBlock().getBlockId())
|
||||||
.setURI(region.getPath().toUri())
|
.setURI(region.getPath().toUri())
|
||||||
|
@ -103,17 +111,16 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
.setLength(region.getBlock().getNumBytes())
|
.setLength(region.getBlock().getNumBytes())
|
||||||
.setGenerationStamp(region.getBlock().getGenerationStamp())
|
.setGenerationStamp(region.getBlock().getGenerationStamp())
|
||||||
.setFsVolume(providedVolume)
|
.setFsVolume(providedVolume)
|
||||||
.setConf(conf).build();
|
.setConf(conf)
|
||||||
|
.build();
|
||||||
ReplicaInfo oldReplica =
|
// check if the replica already exists
|
||||||
volumeMap.get(bpid, newReplica.getBlockId());
|
ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
|
||||||
if (oldReplica == null) {
|
if (oldReplica == null) {
|
||||||
volumeMap.add(bpid, newReplica);
|
volumeMap.add(bpid, newReplica);
|
||||||
bpVolumeMap.add(bpid, newReplica);
|
bpVolumeMap.add(bpid, newReplica);
|
||||||
} else {
|
} else {
|
||||||
throw new IOException(
|
throw new IOException("A block with id " + newReplica.getBlockId()
|
||||||
"A block with id " + newReplica.getBlockId() +
|
+ " already exists in the volumeMap");
|
||||||
" already exists in the volumeMap");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -527,4 +534,42 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"ProvidedVolume does not yet support writes");
|
"ProvidedVolume does not yet support writes");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static URI getAbsoluteURI(URI uri) {
|
||||||
|
if (!uri.isAbsolute()) {
|
||||||
|
// URI is not absolute implies it is for a local file
|
||||||
|
// normalize the URI
|
||||||
|
return StorageLocation.normalizeFileURI(uri);
|
||||||
|
} else {
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param volumeURI URI of the volume
|
||||||
|
* @param blockURI URI of the block
|
||||||
|
* @return true if the {@code blockURI} can belong to the volume or both URIs
|
||||||
|
* are null.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static boolean containsBlock(URI volumeURI, URI blockURI) {
|
||||||
|
if (volumeURI == null && blockURI == null){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (volumeURI == null || blockURI == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
volumeURI = getAbsoluteURI(volumeURI);
|
||||||
|
blockURI = getAbsoluteURI(blockURI);
|
||||||
|
return !volumeURI.relativize(blockURI).equals(blockURI);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFileRegionProvider(String bpid, FileRegionProvider provider)
|
||||||
|
throws IOException {
|
||||||
|
ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
|
||||||
|
if (bp == null) {
|
||||||
|
throw new IOException("block pool " + bpid + " is not found");
|
||||||
|
}
|
||||||
|
bp.setFileRegionProvider(provider);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,8 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
|
@ -174,15 +176,26 @@ public class TestProvidedImpl {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private int minId;
|
private int minId;
|
||||||
private int numBlocks;
|
private int numBlocks;
|
||||||
|
private Iterator<FileRegion> suppliedIterator;
|
||||||
|
|
||||||
TestFileRegionProvider() {
|
TestFileRegionProvider() {
|
||||||
minId = MIN_BLK_ID;
|
this(null, MIN_BLK_ID, NUM_PROVIDED_BLKS);
|
||||||
numBlocks = NUM_PROVIDED_BLKS;
|
}
|
||||||
|
|
||||||
|
TestFileRegionProvider(Iterator<FileRegion> iterator, int minId,
|
||||||
|
int numBlocks) {
|
||||||
|
this.suppliedIterator = iterator;
|
||||||
|
this.minId = minId;
|
||||||
|
this.numBlocks = numBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<FileRegion> iterator() {
|
public Iterator<FileRegion> iterator() {
|
||||||
return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
|
if (suppliedIterator == null) {
|
||||||
|
return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
|
||||||
|
} else {
|
||||||
|
return suppliedIterator;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -503,4 +516,90 @@ public class TestProvidedImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
|
||||||
|
int minBlockId) throws IOException {
|
||||||
|
TestFileRegionIterator fileRegionIterator =
|
||||||
|
new TestFileRegionIterator(basePath, minBlockId, numBlocks);
|
||||||
|
int totalBlocks = 0;
|
||||||
|
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||||
|
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
|
||||||
|
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
|
||||||
|
new TestFileRegionProvider(fileRegionIterator, minBlockId,
|
||||||
|
numBlocks));
|
||||||
|
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||||
|
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
|
||||||
|
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||||
|
}
|
||||||
|
return totalBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests if the FileRegions provided by the FileRegionProvider
|
||||||
|
* can belong to the Providevolume.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testProvidedVolumeContents() throws IOException {
|
||||||
|
int expectedBlocks = 5;
|
||||||
|
int minId = 0;
|
||||||
|
//use a path which has the same prefix as providedBasePath
|
||||||
|
//all these blocks can belong to the provided volume
|
||||||
|
int blocksFound = getBlocksInProvidedVolumes(providedBasePath + "/test1/",
|
||||||
|
expectedBlocks, minId);
|
||||||
|
assertEquals(
|
||||||
|
"Number of blocks in provided volumes should be " + expectedBlocks,
|
||||||
|
expectedBlocks, blocksFound);
|
||||||
|
blocksFound = getBlocksInProvidedVolumes(
|
||||||
|
"file:/" + providedBasePath + "/test1/", expectedBlocks, minId);
|
||||||
|
assertEquals(
|
||||||
|
"Number of blocks in provided volumes should be " + expectedBlocks,
|
||||||
|
expectedBlocks, blocksFound);
|
||||||
|
//use a path that is entirely different from the providedBasePath
|
||||||
|
//none of these blocks can belong to the volume
|
||||||
|
blocksFound =
|
||||||
|
getBlocksInProvidedVolumes("randomtest1/", expectedBlocks, minId);
|
||||||
|
assertEquals("Number of blocks in provided volumes should be 0", 0,
|
||||||
|
blocksFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProvidedVolumeContainsBlock() throws URISyntaxException {
|
||||||
|
assertEquals(true, ProvidedVolumeImpl.containsBlock(null, null));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("file:/a"), null));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/c/"),
|
||||||
|
new URI("file:/a/b/c/d/e.file")));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c/"),
|
||||||
|
new URI("file:/a/b/c/d/e.file")));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c"),
|
||||||
|
new URI("file:/a/b/c/d/e.file")));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c/"),
|
||||||
|
new URI("/a/b/c/d/e.file")));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/c/"),
|
||||||
|
new URI("/a/b/c/d/e.file")));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("/a/b/e"),
|
||||||
|
new URI("file:/a/b/c/d/e.file")));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/e"),
|
||||||
|
new URI("file:/a/b/c/d/e.file")));
|
||||||
|
assertEquals(true,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket1/dir1/"),
|
||||||
|
new URI("s3a:/bucket1/dir1/temp.txt")));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket2/dir1/"),
|
||||||
|
new URI("s3a:/bucket1/dir1/temp.txt")));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket1/dir1/"),
|
||||||
|
new URI("s3a:/bucket1/temp.txt")));
|
||||||
|
assertEquals(false,
|
||||||
|
ProvidedVolumeImpl.containsBlock(new URI("/bucket1/dir1/"),
|
||||||
|
new URI("s3a:/bucket1/dir1/temp.txt")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue