diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 00976f98cab..7db0a8df64d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -352,6 +352,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path"; + public static final String DFS_PROVIDED_ALIASMAP_LEVELDB_PATH = "dfs.provided.aliasmap.leveldb.read.path"; + public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java new file mode 100644 index 00000000000..66971a3f307 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; + +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import static org.fusesource.leveldbjni.JniDBFactory.factory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH; +import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromBlockBytes; +import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromProvidedStorageLocationBytes; +import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.toProtoBufBytes; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A LevelDB based implementation of {@link BlockAliasMap}. + */ +public class LevelDBFileRegionAliasMap + extends BlockAliasMap implements Configurable { + + private Configuration conf; + private LevelDBOptions opts = new LevelDBOptions(); + + public static final Logger LOG = + LoggerFactory.getLogger(LevelDBFileRegionAliasMap.class); + + @Override + public void setConf(Configuration conf) { + opts.setConf(conf); + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Reader getReader(Reader.Options opts) throws IOException { + if (null == opts) { + opts = this.opts; + } + if (!(opts instanceof LevelDBOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + LevelDBOptions o = (LevelDBOptions) opts; + return new LevelDBFileRegionAliasMap.LevelDBReader( + createDB(o.levelDBPath, false)); + } + + @Override + public Writer getWriter(Writer.Options opts) throws IOException { + if (null == opts) { + opts = this.opts; + } + if (!(opts instanceof LevelDBOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + LevelDBOptions o = (LevelDBOptions) opts; + return new LevelDBFileRegionAliasMap.LevelDBWriter( + createDB(o.levelDBPath, true)); + } + + private static DB createDB(String levelDBPath, boolean createIfMissing) + throws IOException { + if (levelDBPath == null || levelDBPath.length() == 0) { + throw new IllegalArgumentException( + "A valid path needs to be specified for " + + LevelDBFileRegionAliasMap.class + " using the parameter " + + DFS_PROVIDED_ALIASMAP_LEVELDB_PATH); + } + org.iq80.leveldb.Options options = new org.iq80.leveldb.Options(); + options.createIfMissing(createIfMissing); + return factory.open(new File(levelDBPath), options); + } + + @Override + public void refresh() throws IOException { + } + + @Override + public void close() throws IOException { + // Do nothing. + } + + /** + * Class specifying reader options for the {@link LevelDBFileRegionAliasMap}. + */ + public static class LevelDBOptions implements LevelDBReader.Options, + LevelDBWriter.Options, Configurable { + private Configuration conf; + private String levelDBPath; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.levelDBPath = conf.get(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public LevelDBOptions filename(String levelDBPath) { + this.levelDBPath = levelDBPath; + return this; + } + } + + /** + * This class is used as a reader for block maps which + * are stored as LevelDB files. + */ + public static class LevelDBReader extends Reader { + + /** + * Options for {@link LevelDBReader}. + */ + public interface Options extends Reader.Options { + Options filename(String levelDBPath); + } + + private DB db; + + LevelDBReader(DB db) { + this.db = db; + } + + @Override + public Optional resolve(Block block) throws IOException { + if (db == null) { + return Optional.empty(); + } + // consider layering index w/ composable format + byte[] key = toProtoBufBytes(block); + byte[] value = db.get(key); + ProvidedStorageLocation psl = fromProvidedStorageLocationBytes(value); + return Optional.of(new FileRegion(block, psl)); + } + + static class FRIterator implements Iterator { + private final DBIterator internal; + + FRIterator(DBIterator internal) { + this.internal = internal; + } + + @Override + public boolean hasNext() { + return internal.hasNext(); + } + + @Override + public FileRegion next() { + Map.Entry entry = internal.next(); + if (entry == null) { + return null; + } + try { + Block block = fromBlockBytes(entry.getKey()); + ProvidedStorageLocation psl = + fromProvidedStorageLocationBytes(entry.getValue()); + return new FileRegion(block, psl); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public Iterator iterator() { + if (db == null) { + return null; + } + DBIterator iterator = db.iterator(); + iterator.seekToFirst(); + return new FRIterator(iterator); + } + + @Override + public void close() throws IOException { + if (db != null) { + db.close(); + } + } + } + + /** + * This class is used as a writer for block maps which + * are stored as LevelDB files. + */ + public static class LevelDBWriter extends Writer { + + /** + * Interface for Writer options. + */ + public interface Options extends Writer.Options { + Options filename(String levelDBPath); + } + + private final DB db; + + LevelDBWriter(DB db) { + this.db = db; + } + + @Override + public void store(FileRegion token) throws IOException { + byte[] key = toProtoBufBytes(token.getBlock()); + byte[] value = toProtoBufBytes(token.getProvidedStorageLocation()); + db.put(key, value); + } + + @Override + public void close() throws IOException { + if (db != null) { + db.close(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java new file mode 100644 index 00000000000..21199e1d126 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Tests for the {@link LevelDBFileRegionAliasMap}. + */ +public class TestLevelDBFileRegionAliasMap { + + /** + * A basic test to verify that we can write data and read it back again. + * @throws Exception + */ + @Test + public void testReadBack() throws Exception { + File dbFile = Files.createTempDirectory("fileregionformat") + .toFile(); + try { + LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap(); + LevelDBFileRegionAliasMap.LevelDBOptions opts = + new LevelDBFileRegionAliasMap.LevelDBOptions() + .filename(dbFile.getAbsolutePath()); + BlockAliasMap.Writer writer = frf.getWriter(opts); + + FileRegion fr = new FileRegion(1, new Path("/file"), 1, 1, 1); + writer.store(fr); + writer.close(); + + BlockAliasMap.Reader reader = frf.getReader(opts); + FileRegion fr2 = reader.resolve(new Block(1, 1, 1)).get(); + assertEquals(fr, fr2); + reader.close(); + } finally { + dbFile.delete(); + } + } + + @Test + /** + * A basic test to verify that we can read a bunch of data that we've written. + */ + public void testIterate() throws Exception { + FileRegion[] regions = new FileRegion[10]; + regions[0] = new FileRegion(1, new Path("/file1"), 0, 1024, 1); + regions[1] = new FileRegion(2, new Path("/file1"), 1024, 1024, 1); + regions[2] = new FileRegion(3, new Path("/file1"), 2048, 1024, 1); + regions[3] = new FileRegion(4, new Path("/file2"), 0, 1024, 1); + regions[4] = new FileRegion(5, new Path("/file2"), 1024, 1024, 1); + regions[5] = new FileRegion(6, new Path("/file2"), 2048, 1024, 1); + regions[6] = new FileRegion(7, new Path("/file2"), 3072, 1024, 1); + regions[7] = new FileRegion(8, new Path("/file3"), 0, 1024, 1); + regions[8] = new FileRegion(9, new Path("/file4"), 0, 1024, 1); + regions[9] = new FileRegion(10, new Path("/file5"), 0, 1024, 1); + File dbFile = Files.createTempDirectory("fileregionformat") + .toFile(); + try { + LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap(); + LevelDBFileRegionAliasMap.LevelDBOptions opts = + new LevelDBFileRegionAliasMap.LevelDBOptions() + .filename(dbFile.getAbsolutePath()); + BlockAliasMap.Writer writer = frf.getWriter(opts); + + for (FileRegion fr : regions) { + writer.store(fr); + } + writer.close(); + + BlockAliasMap.Reader reader = frf.getReader(opts); + Iterator it = reader.iterator(); + int last = -1; + int count = 0; + while(it.hasNext()) { + FileRegion fr = it.next(); + int blockId = (int)fr.getBlock().getBlockId(); + assertEquals(regions[blockId-1], fr); + assertNotEquals(blockId, last); + last = blockId; + count++; + } + assertEquals(count, 10); + + reader.close(); + } finally { + dbFile.delete(); + } + } +}