HDFS-12591. [READ] Implement LevelDBFileRegionFormat. Contributed by Ewan Higgs.

This commit is contained in:
Virajith Jalaparti 2017-12-02 12:22:00 -08:00 committed by Chris Douglas
parent 352f994b64
commit b634053c4d
3 changed files with 374 additions and 0 deletions

View File

@ -352,6 +352,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path";
public static final String DFS_PROVIDED_ALIASMAP_LEVELDB_PATH = "dfs.provided.aliasmap.leveldb.read.path";
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

View File

@ -0,0 +1,257 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import static org.fusesource.leveldbjni.JniDBFactory.factory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromBlockBytes;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromProvidedStorageLocationBytes;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.toProtoBufBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A LevelDB based implementation of {@link BlockAliasMap}.
*/
public class LevelDBFileRegionAliasMap
extends BlockAliasMap<FileRegion> implements Configurable {
private Configuration conf;
private LevelDBOptions opts = new LevelDBOptions();
public static final Logger LOG =
LoggerFactory.getLogger(LevelDBFileRegionAliasMap.class);
@Override
public void setConf(Configuration conf) {
opts.setConf(conf);
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException {
if (null == opts) {
opts = this.opts;
}
if (!(opts instanceof LevelDBOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBReader(
createDB(o.levelDBPath, false));
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
if (null == opts) {
opts = this.opts;
}
if (!(opts instanceof LevelDBOptions)) {
throw new IllegalArgumentException("Invalid options " + opts.getClass());
}
LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBWriter(
createDB(o.levelDBPath, true));
}
private static DB createDB(String levelDBPath, boolean createIfMissing)
throws IOException {
if (levelDBPath == null || levelDBPath.length() == 0) {
throw new IllegalArgumentException(
"A valid path needs to be specified for "
+ LevelDBFileRegionAliasMap.class + " using the parameter "
+ DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
}
org.iq80.leveldb.Options options = new org.iq80.leveldb.Options();
options.createIfMissing(createIfMissing);
return factory.open(new File(levelDBPath), options);
}
@Override
public void refresh() throws IOException {
}
@Override
public void close() throws IOException {
// Do nothing.
}
/**
* Class specifying reader options for the {@link LevelDBFileRegionAliasMap}.
*/
public static class LevelDBOptions implements LevelDBReader.Options,
LevelDBWriter.Options, Configurable {
private Configuration conf;
private String levelDBPath;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.levelDBPath = conf.get(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public LevelDBOptions filename(String levelDBPath) {
this.levelDBPath = levelDBPath;
return this;
}
}
/**
* This class is used as a reader for block maps which
* are stored as LevelDB files.
*/
public static class LevelDBReader extends Reader<FileRegion> {
/**
* Options for {@link LevelDBReader}.
*/
public interface Options extends Reader.Options {
Options filename(String levelDBPath);
}
private DB db;
LevelDBReader(DB db) {
this.db = db;
}
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
if (db == null) {
return Optional.empty();
}
// consider layering index w/ composable format
byte[] key = toProtoBufBytes(block);
byte[] value = db.get(key);
ProvidedStorageLocation psl = fromProvidedStorageLocationBytes(value);
return Optional.of(new FileRegion(block, psl));
}
static class FRIterator implements Iterator<FileRegion> {
private final DBIterator internal;
FRIterator(DBIterator internal) {
this.internal = internal;
}
@Override
public boolean hasNext() {
return internal.hasNext();
}
@Override
public FileRegion next() {
Map.Entry<byte[], byte[]> entry = internal.next();
if (entry == null) {
return null;
}
try {
Block block = fromBlockBytes(entry.getKey());
ProvidedStorageLocation psl =
fromProvidedStorageLocationBytes(entry.getValue());
return new FileRegion(block, psl);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public Iterator<FileRegion> iterator() {
if (db == null) {
return null;
}
DBIterator iterator = db.iterator();
iterator.seekToFirst();
return new FRIterator(iterator);
}
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
}
}
/**
* This class is used as a writer for block maps which
* are stored as LevelDB files.
*/
public static class LevelDBWriter extends Writer<FileRegion> {
/**
* Interface for Writer options.
*/
public interface Options extends Writer.Options {
Options filename(String levelDBPath);
}
private final DB db;
LevelDBWriter(DB db) {
this.db = db;
}
@Override
public void store(FileRegion token) throws IOException {
byte[] key = toProtoBufBytes(token.getBlock());
byte[] value = toProtoBufBytes(token.getProvidedStorageLocation());
db.put(key, value);
}
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.junit.Test;
import java.io.File;
import java.nio.file.Files;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
* Tests for the {@link LevelDBFileRegionAliasMap}.
*/
public class TestLevelDBFileRegionAliasMap {
/**
* A basic test to verify that we can write data and read it back again.
* @throws Exception
*/
@Test
public void testReadBack() throws Exception {
File dbFile = Files.createTempDirectory("fileregionformat")
.toFile();
try {
LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap();
LevelDBFileRegionAliasMap.LevelDBOptions opts =
new LevelDBFileRegionAliasMap.LevelDBOptions()
.filename(dbFile.getAbsolutePath());
BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts);
FileRegion fr = new FileRegion(1, new Path("/file"), 1, 1, 1);
writer.store(fr);
writer.close();
BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts);
FileRegion fr2 = reader.resolve(new Block(1, 1, 1)).get();
assertEquals(fr, fr2);
reader.close();
} finally {
dbFile.delete();
}
}
@Test
/**
* A basic test to verify that we can read a bunch of data that we've written.
*/
public void testIterate() throws Exception {
FileRegion[] regions = new FileRegion[10];
regions[0] = new FileRegion(1, new Path("/file1"), 0, 1024, 1);
regions[1] = new FileRegion(2, new Path("/file1"), 1024, 1024, 1);
regions[2] = new FileRegion(3, new Path("/file1"), 2048, 1024, 1);
regions[3] = new FileRegion(4, new Path("/file2"), 0, 1024, 1);
regions[4] = new FileRegion(5, new Path("/file2"), 1024, 1024, 1);
regions[5] = new FileRegion(6, new Path("/file2"), 2048, 1024, 1);
regions[6] = new FileRegion(7, new Path("/file2"), 3072, 1024, 1);
regions[7] = new FileRegion(8, new Path("/file3"), 0, 1024, 1);
regions[8] = new FileRegion(9, new Path("/file4"), 0, 1024, 1);
regions[9] = new FileRegion(10, new Path("/file5"), 0, 1024, 1);
File dbFile = Files.createTempDirectory("fileregionformat")
.toFile();
try {
LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap();
LevelDBFileRegionAliasMap.LevelDBOptions opts =
new LevelDBFileRegionAliasMap.LevelDBOptions()
.filename(dbFile.getAbsolutePath());
BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts);
for (FileRegion fr : regions) {
writer.store(fr);
}
writer.close();
BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts);
Iterator<FileRegion> it = reader.iterator();
int last = -1;
int count = 0;
while(it.hasNext()) {
FileRegion fr = it.next();
int blockId = (int)fr.getBlock().getBlockId();
assertEquals(regions[blockId-1], fr);
assertNotEquals(blockId, last);
last = blockId;
count++;
}
assertEquals(count, 10);
reader.close();
} finally {
dbFile.delete();
}
}
}