diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cebb1b0ea7d..1822a2ac4a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -61,6 +61,15 @@ public final class OzoneConfigKeys { public static final String OZONE_CONTAINER_METADATA_DIRS = "ozone.container.metadata.dirs"; + public static final String OZONE_METADATA_STORE_IMPL = + "ozone.metastore.impl"; + public static final String OZONE_METADATA_STORE_IMPL_LEVELDB = + "LevelDB"; + public static final String OZONE_METADATA_STORE_IMPL_ROCKSDB = + "RocksDB"; + public static final String OZONE_METADATA_STORE_IMPL_DEFAULT = + OZONE_METADATA_STORE_IMPL_LEVELDB; + public static final String OZONE_KEY_CACHE = "ozone.key.cache.size"; public static final int OZONE_KEY_CACHE_DEFAULT = 1024; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index 7d0e75667ca..0a360e2c649 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -27,8 +27,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; -import org.apache.hadoop.utils.LevelDBStore; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,7 +234,8 @@ public final class ContainerUtils { * @param containerPath - Container Path. * @throws IOException */ - public static Path createMetadata(Path containerPath) throws IOException { + public static Path createMetadata(Path containerPath, Configuration conf) + throws IOException { Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class); Preconditions.checkNotNull(containerPath); Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH); @@ -243,9 +245,11 @@ public final class ContainerUtils { throw new IOException("Unable to create directory for metadata storage." + " Path: " + metadataPath); } - LevelDBStore store = - new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB) - .toFile(), true); + MetadataStore store = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setCreateIfMissing(true) + .setDbFile(metadataPath.resolve(OzoneConsts.CONTAINER_DB).toFile()) + .build(); // we close since the SCM pre-creates containers. // we will open and put Db handle into a cache when keys are being created @@ -347,7 +351,7 @@ public final class ContainerUtils { Preconditions.checkNotNull(containerData); Path dbPath = Paths.get(containerData.getDBPath()); - LevelDBStore db = KeyUtils.getDB(containerData, conf); + MetadataStore db = KeyUtils.getDB(containerData, conf); // If the container is not empty and cannot be deleted forcibly, // then throw a SCE to stop deleting. if(!forceDelete && !db.isEmpty()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java index 4eb89990d20..1a36b7129f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java @@ -23,7 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; -import org.apache.hadoop.utils.LevelDBStore; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,20 +58,21 @@ public final class KeyUtils { * * @param container container. * @param conf configuration. - * @return LevelDB handle. + * @return MetadataStore handle. * @throws StorageContainerException */ - public static LevelDBStore getDB(ContainerData container, + public static MetadataStore getDB(ContainerData container, Configuration conf) throws StorageContainerException { Preconditions.checkNotNull(container); ContainerCache cache = ContainerCache.getInstance(conf); Preconditions.checkNotNull(cache); try { - LevelDBStore db = cache.getDB(container.getContainerName()); + MetadataStore db = cache.getDB(container.getContainerName()); if (db == null) { - db = new LevelDBStore( - new File(container.getDBPath()), - false); + db = MetadataStoreBuilder.newBuilder() + .setDbFile(new File(container.getDBPath())) + .setCreateIfMissing(false) + .build(); cache.putDB(container.getContainerName(), db); } return db; @@ -103,10 +105,10 @@ public final class KeyUtils { @SuppressWarnings("unchecked") public static void shutdownCache(ContainerCache cache) { Logger log = LoggerFactory.getLogger(KeyManagerImpl.class); - LevelDBStore[] handles = new LevelDBStore[cache.values().size()]; + MetadataStore[] handles = new MetadataStore[cache.values().size()]; cache.values().toArray(handles); Preconditions.checkState(handles.length == cache.values().size()); - for (LevelDBStore db : handles) { + for (MetadataStore db : handles) { try { db.close(); } catch (IOException ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java index 2b64b714fd2..7fff0a34b47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java @@ -30,8 +30,9 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.LevelDBStore; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +41,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.IO_EXCEPTION; import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .Result.NO_SUCH_KEY; - /** * Key Manager impl. */ @@ -74,8 +72,7 @@ public class KeyManagerImpl implements KeyManager { * {@inheritDoc} */ @Override - public void putKey(Pipeline pipeline, KeyData data) - throws StorageContainerException { + public void putKey(Pipeline pipeline, KeyData data) throws IOException { containerManager.readLock(); try { // We are not locking the key manager since LevelDb serializes all actions @@ -85,7 +82,7 @@ public class KeyManagerImpl implements KeyManager { "Container name cannot be null"); ContainerData cData = containerManager.readContainer( pipeline.getContainerName()); - LevelDBStore db = KeyUtils.getDB(cData, conf); + MetadataStore db = KeyUtils.getDB(cData, conf); // This is a post condition that acts as a hint to the user. // Should never fail. @@ -102,7 +99,7 @@ public class KeyManagerImpl implements KeyManager { * {@inheritDoc} */ @Override - public KeyData getKey(KeyData data) throws StorageContainerException { + public KeyData getKey(KeyData data) throws IOException { containerManager.readLock(); try { Preconditions.checkNotNull(data, "Key data cannot be null"); @@ -110,7 +107,7 @@ public class KeyManagerImpl implements KeyManager { "Container name cannot be null"); ContainerData cData = containerManager.readContainer(data .getContainerName()); - LevelDBStore db = KeyUtils.getDB(cData, conf); + MetadataStore db = KeyUtils.getDB(cData, conf); // This is a post condition that acts as a hint to the user. // Should never fail. @@ -124,8 +121,6 @@ public class KeyManagerImpl implements KeyManager { ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData); return KeyData.getFromProtoBuf(keyData); - } catch (IOException ex) { - throw new StorageContainerException(ex, IO_EXCEPTION); } finally { containerManager.readUnlock(); } @@ -136,7 +131,7 @@ public class KeyManagerImpl implements KeyManager { */ @Override public void deleteKey(Pipeline pipeline, String keyName) - throws StorageContainerException { + throws IOException { containerManager.readLock(); try { Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); @@ -144,7 +139,7 @@ public class KeyManagerImpl implements KeyManager { "Container name cannot be null"); ContainerData cData = containerManager.readContainer(pipeline .getContainerName()); - LevelDBStore db = KeyUtils.getDB(cData, conf); + MetadataStore db = KeyUtils.getDB(cData, conf); // This is a post condition that acts as a hint to the user. // Should never fail. @@ -171,32 +166,28 @@ public class KeyManagerImpl implements KeyManager { @Override public List listKey( Pipeline pipeline, String prefix, String startKey, int count) - throws StorageContainerException { + throws IOException { Preconditions.checkNotNull(pipeline, "Pipeline cannot be null."); Preconditions.checkArgument(count > 0, "Count must be a positive number."); ContainerData cData = containerManager.readContainer(pipeline .getContainerName()); - LevelDBStore db = KeyUtils.getDB(cData, conf); - try { - List result = new ArrayList(); - byte[] startKeyInBytes = startKey == null ? null : - DFSUtil.string2Bytes(startKey); - KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefix); - List> range = - db.getRangeKVs(startKeyInBytes, count, prefixFilter); - for(Map.Entry entry : range) { - String keyName = KeyUtils.getKeyName(entry.getKey()); - KeyData value = KeyUtils.getKeyData(entry.getValue()); - KeyData data = new KeyData(value.getContainerName(), keyName); - result.add(data); - } - return result; - } catch (IOException e) { - throw new StorageContainerException(e, - ContainerProtos.Result.IO_EXCEPTION); + MetadataStore db = KeyUtils.getDB(cData, conf); + + List result = new ArrayList(); + byte[] startKeyInBytes = startKey == null ? null : + DFSUtil.string2Bytes(startKey); + MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix); + List> range = + db.getRangeKVs(startKeyInBytes, count, prefixFilter); + for (Map.Entry entry : range) { + String keyName = KeyUtils.getKeyName(entry.getKey()); + KeyData value = KeyUtils.getKeyData(entry.getValue()); + KeyData data = new KeyData(value.getContainerName(), keyName); + result.add(data); } + return result; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java index a362d076e29..a613d2aca0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import java.io.IOException; import java.util.List; /** @@ -32,18 +33,18 @@ public interface KeyManager { * * @param pipeline - Pipeline. * @param data - Key Data. - * @throws StorageContainerException + * @throws IOException */ - void putKey(Pipeline pipeline, KeyData data) throws StorageContainerException; + void putKey(Pipeline pipeline, KeyData data) throws IOException; /** * Gets an existing key. * * @param data - Key Data. * @return Key Data. - * @throws StorageContainerException + * @throws IOException */ - KeyData getKey(KeyData data) throws StorageContainerException; + KeyData getKey(KeyData data) throws IOException; /** * Deletes an existing Key. @@ -53,7 +54,7 @@ public interface KeyManager { * @throws StorageContainerException */ void deleteKey(Pipeline pipeline, String keyName) - throws StorageContainerException; + throws IOException; /** * List keys in a container. @@ -65,7 +66,7 @@ public interface KeyManager { * @return List of Keys that match the criteria. */ List listKey(Pipeline pipeline, String prefix, String startKey, - int count) throws StorageContainerException; + int count) throws IOException; /** * Shutdown keyManager. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java index 81cdcf2d9a8..f4caad027c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java @@ -24,7 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.utils.LevelDBStore; +import org.apache.hadoop.utils.MetadataStore; import java.io.IOException; import java.util.concurrent.locks.Lock; @@ -69,7 +69,7 @@ public final class ContainerCache extends LRUMap { protected boolean removeLRU(LinkEntry entry) { lock.lock(); try { - LevelDBStore db = (LevelDBStore) entry.getValue(); + MetadataStore db = (MetadataStore) entry.getValue(); db.close(); } catch (IOException e) { LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e); @@ -83,14 +83,14 @@ public final class ContainerCache extends LRUMap { * Returns a DB handle if available, null otherwise. * * @param containerName - Name of the container. - * @return OzoneLevelDBStore. + * @return MetadataStore. */ - public LevelDBStore getDB(String containerName) { + public MetadataStore getDB(String containerName) { Preconditions.checkNotNull(containerName); Preconditions.checkState(!containerName.isEmpty()); lock.lock(); try { - return (LevelDBStore) this.get(containerName); + return (MetadataStore) this.get(containerName); } finally { lock.unlock(); } @@ -106,7 +106,7 @@ public final class ContainerCache extends LRUMap { Preconditions.checkState(!containerName.isEmpty()); lock.lock(); try { - LevelDBStore db = this.getDB(containerName); + MetadataStore db = this.getDB(containerName); if (db != null) { try { db.close(); @@ -126,7 +126,7 @@ public final class ContainerCache extends LRUMap { * @param containerName - Name of the container * @param db - DB handle */ - public void putDB(String containerName, LevelDBStore db) { + public void putDB(String containerName, MetadataStore db) { Preconditions.checkNotNull(containerName); Preconditions.checkState(!containerName.isEmpty()); lock.lock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java index 1e423072be9..36e5b3ac6a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java @@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.ksm; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; +import org.apache.hadoop.utils.BatchOperation; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.locks.Lock; /** @@ -56,36 +56,27 @@ public interface MetadataManager { * @param key - key * @return value */ - byte[] get(byte[] key); + byte[] get(byte[] key) throws IOException; /** * Puts a Key into Metadata DB. * @param key - key * @param value - value */ - void put(byte[] key, byte[] value); + void put(byte[] key, byte[] value) throws IOException; /** * Deletes a Key from Metadata DB. * @param key - key */ - void delete(byte[] key); + void delete(byte[] key) throws IOException; /** - * Performs batch Put and Delete to Metadata DB. - * Can be used to do multiple puts and deletes atomically. - * @param putList - list of Key/Value to put into DB - * @param delList - list of Key to delete from DB + * Atomic write a batch of operations. + * @param batch + * @throws IOException */ - void batchPutDelete(List> putList, - List delList) throws IOException; - - /** - * Performs a batch Put to Metadata DB. - * Can be used to do multiple puts atomically. - * @param putList - list of Key/Value to put into DB - */ - void batchPut(List> putList) throws IOException; + void writeBatch(BatchOperation batch) throws IOException; /** * Given a volume return the corresponding DB key. @@ -120,7 +111,7 @@ public interface MetadataManager { * * @param key - key name */ - void deleteKey(byte[] key); + void deleteKey(byte[] key) throws IOException; /** * Given a volume, check if it is empty, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java index dd3d524188e..7e48eda4fc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.ksm; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; import org.apache.hadoop.ksm.helpers.KsmKeyInfo; @@ -32,12 +32,11 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.WriteBatch; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import java.io.File; import java.io.IOException; @@ -57,9 +56,9 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys /** * KSM metadata manager interface. */ -public class MetadataManagerImpl implements MetadataManager { +public class MetadataManagerImpl implements MetadataManager { - private final LevelDBStore store; + private final MetadataStore store; private final ReadWriteLock lock; @@ -67,10 +66,12 @@ public class MetadataManagerImpl implements MetadataManager { File metaDir = OzoneUtils.getScmMetadirPath(conf); final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB, OZONE_KSM_DB_CACHE_SIZE_DEFAULT); - Options options = new Options(); - options.cacheSize(cacheSize * OzoneConsts.MB); File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME); - this.store = new LevelDBStore(ksmDBFile, options); + this.store = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(ksmDBFile) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); this.lock = new ReentrantReadWriteLock(); } @@ -153,7 +154,7 @@ public class MetadataManagerImpl implements MetadataManager { * @param key - key name */ @Override - public void deleteKey(byte[] key) { + public void deleteKey(byte[] key) throws IOException { store.delete(key); } @@ -181,7 +182,7 @@ public class MetadataManagerImpl implements MetadataManager { * @return value */ @Override - public byte[] get(byte[] key) { + public byte[] get(byte[] key) throws IOException { return store.get(key); } @@ -191,7 +192,7 @@ public class MetadataManagerImpl implements MetadataManager { * @param value - value */ @Override - public void put(byte[] key, byte[] value) { + public void put(byte[] key, byte[] value) throws IOException { store.put(key, value); } @@ -199,45 +200,13 @@ public class MetadataManagerImpl implements MetadataManager { * Deletes a Key from Metadata DB. * @param key - key */ - public void delete(byte[] key) { + public void delete(byte[] key) throws IOException { store.delete(key); } - /** - * Performs a batch Put and Delete from Metadata DB. - * Can be used to do multiple puts and deletes atomically. - * @param putList - list of key and value pairs to put to Metadata DB. - * @param delList - list of keys to delete from Metadata DB. - */ @Override - public void batchPutDelete(List> putList, - List delList) - throws IOException { - WriteBatch batch = store.createWriteBatch(); - putList.forEach(entry -> batch.put(entry.getKey(), entry.getValue())); - delList.forEach(entry -> batch.delete(entry)); - try { - store.commitWriteBatch(batch); - } finally { - store.closeWriteBatch(batch); - } - } - - /** - * Performs a batch Put to Metadata DB. - * Can be used to do multiple puts atomically. - * @param list - list of Map.Entry - */ - @Override - public void batchPut(List> list) - throws IOException { - WriteBatch batch = store.createWriteBatch(); - list.forEach(entry -> batch.put(entry.getKey(), entry.getValue())); - try { - store.commitWriteBatch(batch); - } finally { - store.closeWriteBatch(batch); - } + public void writeBatch(BatchOperation batch) throws IOException { + this.store.writeBatch(batch); } /** @@ -246,21 +215,17 @@ public class MetadataManagerImpl implements MetadataManager { * @return true if the volume is empty */ public boolean isVolumeEmpty(String volume) throws IOException { - try (DBIterator iterator = store.getIterator()) { - String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume - + OzoneConsts.KSM_BUCKET_PREFIX; - byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName); - // Seek to the root of the volume and look for the next key - iterator.seek(dbVolumeRootKey); - if (iterator.hasNext()) { - String firstBucketKey = DFSUtil.bytes2String(iterator.next().getKey()); - // if the key starts with // - // then there is at least one bucket - return !firstBucketKey.startsWith(dbVolumeRootName); - } else { - return true; - } + String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume; + byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName); + // Seek to the root of the volume and look for the next key + ImmutablePair volumeRoot = + store.peekAround(1, dbVolumeRootKey); + if (volumeRoot != null) { + String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey()); + return !firstBucketKey.startsWith(dbVolumeRootName + + OzoneConsts.KSM_BUCKET_PREFIX); } + return true; } /** @@ -272,18 +237,15 @@ public class MetadataManagerImpl implements MetadataManager { */ public boolean isBucketEmpty(String volume, String bucket) throws IOException { - try (DBIterator iterator = store.getIterator()) { - String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume - + OzoneConsts.KSM_BUCKET_PREFIX + bucket - + OzoneConsts.KSM_KEY_PREFIX; - byte[] keyRoot = DFSUtil.string2Bytes(keyRootName); - iterator.seek(keyRoot); - if(iterator.hasNext()) { - return !DFSUtil.bytes2String(iterator.next().getKey()) - .startsWith(keyRootName); - } - return true; + String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume + + OzoneConsts.KSM_BUCKET_PREFIX + bucket; + byte[] keyRoot = DFSUtil.string2Bytes(keyRootName); + ImmutablePair firstKey = store.peekAround(1, keyRoot); + if (firstKey != null) { + return !DFSUtil.bytes2String(firstKey.getKey()) + .startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX); } + return true; } /** @@ -305,8 +267,19 @@ public class MetadataManagerImpl implements MetadataManager { ResultCodes.FAILED_VOLUME_NOT_FOUND); } - LevelDBKeyFilter filter = - new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix)); + + // A bucket must start with /volume/bucket_prefix + // and exclude keys /volume/bucket_xxx/key_xxx + MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> { + if (currentKey != null) { + String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix); + String bucket = DFSUtil.bytes2String(currentKey); + return bucket.startsWith(bucketNamePrefix) && + !bucket.replaceFirst(bucketNamePrefix, "") + .contains(OzoneConsts.KSM_KEY_PREFIX); + } + return false; + }; List> rangeResult; if (!Strings.isNullOrEmpty(startBucket)) { @@ -349,7 +322,7 @@ public class MetadataManagerImpl implements MetadataManager { ResultCodes.FAILED_BUCKET_NOT_FOUND); } - LevelDBKeyFilter filter = + MetadataKeyFilter filter = new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix)); List> rangeResult; @@ -427,18 +400,17 @@ public class MetadataManagerImpl implements MetadataManager { private VolumeList getVolumesByUser(byte[] userNameKey) throws KSMException { VolumeList volumes = null; - byte[] volumesInBytes = store.get(userNameKey); - if (volumesInBytes == null) { - // No volume found for this user, return an empty list - return VolumeList.newBuilder().build(); - } - try { + byte[] volumesInBytes = store.get(userNameKey); + if (volumesInBytes == null) { + // No volume found for this user, return an empty list + return VolumeList.newBuilder().build(); + } volumes = VolumeList.parseFrom(volumesInBytes); - } catch (InvalidProtocolBufferException e) { + } catch (IOException e) { throw new KSMException("Unable to get volumes info by the given user, " - + "metadata might be corrupted", - e, ResultCodes.FAILED_METADATA_ERROR); + + "metadata might be corrupted", e, + ResultCodes.FAILED_METADATA_ERROR); } return volumes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java index 0757c9d4dbf..f4590a25320 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java @@ -26,15 +26,13 @@ import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.VolumeInfo; -import org.iq80.leveldb.DBException; +import org.apache.hadoop.utils.BatchOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.AbstractMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import static org.apache.hadoop.ozone.ksm .KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT; @@ -67,8 +65,7 @@ public class VolumeManagerImpl implements VolumeManager { // Helpers to add and delete volume from user list private void addVolumeToOwnerList(String volume, String owner, - List> putBatch) - throws IOException { + BatchOperation batchOperation) throws IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); byte[] volumeList = metadataManager.get(dbUserKey); @@ -88,12 +85,11 @@ public class VolumeManagerImpl implements VolumeManager { prevVolList.add(volume); VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray())); + batchOperation.put(dbUserKey, newVolList.toByteArray()); } private void delVolumeFromOwnerList(String volume, String owner, - List> putBatch, - List deleteBatch) + BatchOperation batchOperation) throws IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); @@ -109,18 +105,14 @@ public class VolumeManagerImpl implements VolumeManager { // Remove the volume from the list prevVolList.remove(volume); if (prevVolList.size() == 0) { - deleteBatch.add(dbUserKey); + batchOperation.delete(dbUserKey); } else { VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray())); + batchOperation.put(dbUserKey, newVolList.toByteArray()); } } - private Map.Entry batchEntry(byte[] key, byte[] value) { - return new AbstractMap.SimpleEntry<>(key, value); - } - /** * Creates a volume. * @param args - KsmVolumeArgs. @@ -129,7 +121,6 @@ public class VolumeManagerImpl implements VolumeManager { public void createVolume(KsmVolumeArgs args) throws IOException { Preconditions.checkNotNull(args); metadataManager.writeLock().lock(); - List> batch = new LinkedList<>(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume()); byte[] volumeInfo = metadataManager.get(dbVolumeKey); @@ -140,16 +131,17 @@ public class VolumeManagerImpl implements VolumeManager { throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS); } + BatchOperation batch = new BatchOperation(); // Write the vol info VolumeInfo newVolumeInfo = args.getProtobuf(); - batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray())); + batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); // Add volume to user list addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); - metadataManager.batchPut(batch); + metadataManager.writeBatch(batch); LOG.info("created volume:{} user:{}", args.getVolume(), args.getOwnerName()); - } catch (IOException | DBException ex) { + } catch (IOException ex) { LOG.error("Volume creation failed for user:{} volname:{}", args.getOwnerName(), args.getVolume(), ex); throw ex; @@ -169,8 +161,6 @@ public class VolumeManagerImpl implements VolumeManager { public void setOwner(String volume, String owner) throws IOException { Preconditions.checkNotNull(volume); Preconditions.checkNotNull(owner); - List> putBatch = new LinkedList<>(); - List deleteBatch = new LinkedList<>(); metadataManager.writeLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); @@ -183,9 +173,9 @@ public class VolumeManagerImpl implements VolumeManager { KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo); Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), - putBatch, deleteBatch); - addVolumeToOwnerList(volume, owner, putBatch); + BatchOperation batch = new BatchOperation(); + delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); + addVolumeToOwnerList(volume, owner, batch); KsmVolumeArgs newVolumeArgs = KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) @@ -195,9 +185,9 @@ public class VolumeManagerImpl implements VolumeManager { .build(); VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - putBatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray())); + batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); - metadataManager.batchPutDelete(putBatch, deleteBatch); + metadataManager.writeBatch(batch); } catch (IOException ex) { LOG.error("Changing volume ownership failed for user:{} volume:{}", owner, volume, ex); @@ -285,8 +275,7 @@ public class VolumeManagerImpl implements VolumeManager { Preconditions.checkNotNull(volume); metadataManager.writeLock().lock(); try { - List> putBatch = new LinkedList<>(); - List deleteBatch = new LinkedList<>(); + BatchOperation batch = new BatchOperation(); byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); byte[] volInfo = metadataManager.get(dbVolumeKey); if (volInfo == null) { @@ -301,10 +290,9 @@ public class VolumeManagerImpl implements VolumeManager { Preconditions.checkState(volume.equals(volumeInfo.getVolume())); // delete the volume from the owner list // as well as delete the volume entry - delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), - putBatch, deleteBatch); - deleteBatch.add(dbVolumeKey); - metadataManager.batchPutDelete(putBatch, deleteBatch); + delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch); + batch.delete(dbVolumeKey); + metadataManager.writeBatch(batch); } catch (IOException ex) { LOG.error("Delete volume failed for volume:{}", volume, ex); throw ex; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index 8e49b5f9397..98da9b16c45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -29,10 +29,9 @@ import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.WriteBatch; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,13 +74,13 @@ public class BlockManagerImpl implements BlockManager { private final NodeManager nodeManager; private final Mapping containerManager; - private final LevelDBStore blockStore; + private final MetadataStore blockStore; private final Lock lock; private final long containerSize; private final long cacheSize; - private final LevelDBStore openContainerStore; + private final MetadataStore openContainerStore; private Map openContainers; private final int containerProvisionBatchSize; private final Random rand; @@ -102,12 +101,14 @@ public class BlockManagerImpl implements BlockManager { this.cacheSize = cacheSizeMB; File metaDir = OzoneUtils.getScmMetadirPath(conf); String scmMetaDataDir = metaDir.getPath(); - Options options = new Options(); - options.cacheSize(this.cacheSize * OzoneConsts.MB); // Write the block key to container name mapping. File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB); - blockStore = new LevelDBStore(blockContainerDbPath, options); + blockStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(blockContainerDbPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); this.containerSize = OzoneConsts.GB * conf.getInt( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, @@ -115,7 +116,12 @@ public class BlockManagerImpl implements BlockManager { // Load store of all open contains for block allocation File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB); - openContainerStore = new LevelDBStore(openContainsDbPath, options); + openContainerStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(openContainsDbPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); + openContainers = new HashMap<>(); loadOpenContainers(); @@ -132,20 +138,19 @@ public class BlockManagerImpl implements BlockManager { * @throws IOException */ private void loadOpenContainers() throws IOException { - try (DBIterator iter = openContainerStore.getIterator()) { - for (iter.seekToFirst(); iter.hasNext(); iter.next()) { + try { + openContainerStore.iterate(null, (key, value) -> { try { - byte[] key = iter.peekNext().getKey(); String containerName = DFSUtil.bytes2String(key); - byte[] value = iter.peekNext().getValue(); Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value)); openContainers.put(containerName, containerUsed); LOG.debug("Loading open container: {} used : {}", containerName, containerUsed); - } catch (Exception ex) { + } catch (Exception e) { LOG.warn("Failed loading open container, continue next..."); } - } + return true; + }); } catch (IOException e) { LOG.error("Loading open container store failed." + e); throw new SCMException("Failed to load open container store", @@ -321,21 +326,19 @@ public class BlockManagerImpl implements BlockManager { throw new SCMException("Specified block key does not exist. key : " + key, FAILED_TO_FIND_BLOCK); } - try (WriteBatch wb = blockStore.createWriteBatch()) { - containerManager.getContainer( - DFSUtil.bytes2String(containerBytes)); - String deletedKeyName = getDeletedKeyName(key); - // Add a tombstone for the deleted key - wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes); - // Delete the block key - wb.delete(DFSUtil.string2Bytes(key)); - blockStore.commitWriteBatch(wb); - // TODO: Add async tombstone clean thread to send delete command to - // datanodes in the pipeline to clean up the blocks from containers. - // TODO: Container report handling of the deleted blocks: - // Remove tombstone and update open container usage. - // We will revisit this when the closed container replication is done. - } + BatchOperation batch = new BatchOperation(); + containerManager.getContainer(DFSUtil.bytes2String(containerBytes)); + String deletedKeyName = getDeletedKeyName(key); + // Add a tombstone for the deleted key + batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes); + // Delete the block key + batch.delete(DFSUtil.string2Bytes(key)); + blockStore.writeBatch(batch); + // TODO: Add async tombstone clean thread to send delete command to + // datanodes in the pipeline to clean up the blocks from containers. + // TODO: Container report handling of the deleted blocks: + // Remove tombstone and update open container usage. + // We will revisit this when the closed container replication is done. } finally { lock.unlock(); } @@ -359,4 +362,4 @@ public class BlockManagerImpl implements BlockManager { openContainerStore.close(); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index f558882db25..0e436e9a372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -31,8 +31,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.DBIterator; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,6 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.HashSet; -import java.util.Map; import java.util.Set; import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; @@ -153,6 +152,7 @@ public class SQLCLI extends Configured implements Tool { .withDescription("specify output path") .create("o"); allOptions.addOption(outPathOption); + return allOptions; } @@ -254,22 +254,25 @@ public class SQLCLI extends Configured implements Tool { throws Exception { LOG.info("Create tables for sql container db."); File dbFile = dbPath.toFile(); - org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options(); - try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions); - Connection conn = connectDB(outPath.toString()); - DBIterator iter = dbStore.getIterator()) { + try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setDbFile(dbFile).build(); + Connection conn = connectDB(outPath.toString())) { executeSQL(conn, CREATE_CONTAINER_INFO); executeSQL(conn, CREATE_CONTAINER_MEMBERS); executeSQL(conn, CREATE_DATANODE_INFO); - iter.seekToFirst(); HashSet uuidChecked = new HashSet<>(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - String containerName = new String(entry.getKey(), encoding); - Pipeline pipeline = Pipeline.parseFrom(entry.getValue()); - insertContainerDB(conn, containerName, pipeline, uuidChecked); - } + dbStore.iterate(null, (key, value) -> { + String containerName = new String(key, encoding); + Pipeline pipeline = null; + pipeline = Pipeline.parseFrom(value); + try { + insertContainerDB(conn, containerName, pipeline, uuidChecked); + return true; + } catch (SQLException e) { + throw new IOException(e); + } + }); } } @@ -330,21 +333,24 @@ public class SQLCLI extends Configured implements Tool { private void convertBlockDB(Path dbPath, Path outPath) throws Exception { LOG.info("Create tables for sql block db."); File dbFile = dbPath.toFile(); - org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options(); - try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions); - Connection conn = connectDB(outPath.toString()); - DBIterator iter = dbStore.getIterator()) { + try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setDbFile(dbFile).build(); + Connection conn = connectDB(outPath.toString())) { executeSQL(conn, CREATE_BLOCK_CONTAINER); - iter.seekToFirst(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - String blockKey = DFSUtilClient.bytes2String(entry.getKey()); - String containerName = DFSUtilClient.bytes2String(entry.getValue()); + dbStore.iterate(null, (key, value) -> { + String blockKey = DFSUtilClient.bytes2String(key); + String containerName = DFSUtilClient.bytes2String(value); String insertBlockContainer = String.format( INSERT_BLOCK_CONTAINER, blockKey, containerName); - executeSQL(conn, insertBlockContainer); - } + + try { + executeSQL(conn, insertBlockContainer); + return true; + } catch (SQLException e) { + throw new IOException(e); + } + }); } } @@ -374,21 +380,23 @@ public class SQLCLI extends Configured implements Tool { private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception { LOG.info("Create table for sql node pool db."); File dbFile = dbPath.toFile(); - org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options(); - try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions); - Connection conn = connectDB(outPath.toString()); - DBIterator iter = dbStore.getIterator()) { + try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setDbFile(dbFile).build(); + Connection conn = connectDB(outPath.toString())) { executeSQL(conn, CREATE_NODE_POOL); executeSQL(conn, CREATE_DATANODE_INFO); - iter.seekToFirst(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - DatanodeID nodeId = DatanodeID.getFromProtoBuf( - HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey())); - String blockPool = DFSUtil.bytes2String(entry.getValue()); - insertNodePoolDB(conn, blockPool, nodeId); - } + dbStore.iterate(null, (key, value) -> { + DatanodeID nodeId = DatanodeID + .getFromProtoBuf(HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key)); + String blockPool = DFSUtil.bytes2String(value); + try { + insertNodePoolDB(conn, blockPool, nodeId); + return true; + } catch (SQLException e) { + throw new IOException(e); + } + }); } } @@ -423,22 +431,24 @@ public class SQLCLI extends Configured implements Tool { throws Exception { LOG.info("Create table for open container db."); File dbFile = dbPath.toFile(); - org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options(); - try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions); - Connection conn = connectDB(outPath.toString()); - DBIterator iter = dbStore.getIterator()) { + try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setDbFile(dbFile).build(); + Connection conn = connectDB(outPath.toString())) { executeSQL(conn, CREATE_OPEN_CONTAINER); - iter.seekToFirst(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - String containerName = DFSUtil.bytes2String(entry.getKey()); - Long containerUsed = Long.parseLong( - DFSUtil.bytes2String(entry.getValue())); - String insertOpenContainer = String.format( - INSERT_OPEN_CONTAINER, containerName, containerUsed); - executeSQL(conn, insertOpenContainer); - } + dbStore.iterate(null, (key, value) -> { + String containerName = DFSUtil.bytes2String(key); + Long containerUsed = + Long.parseLong(DFSUtil.bytes2String(value)); + String insertOpenContainer = String + .format(INSERT_OPEN_CONTAINER, containerName, containerUsed); + try { + executeSQL(conn, insertOpenContainer); + return true; + } catch (SQLException e) { + throw new IOException(e); + } + }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 75694bbcdba..643779d612b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.Options; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ public class ContainerMapping implements Mapping { private final long cacheSize; private final Lock lock; private final Charset encoding = Charset.forName("UTF-8"); - private final LevelDBStore containerStore; + private final MetadataStore containerStore; private final ContainerPlacementPolicy placementPolicy; private final long containerSize; @@ -85,12 +86,14 @@ public class ContainerMapping implements Mapping { this.cacheSize = cacheSizeMB; File metaDir = OzoneUtils.getScmMetadirPath(conf); - Options options = new Options(); - options.cacheSize(this.cacheSize * OzoneConsts.MB); // Write the container name to pipeline mapping. File containerDBPath = new File(metaDir, CONTAINER_DB); - containerStore = new LevelDBStore(containerDBPath, options); + containerStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(containerDBPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); this.lock = new ReentrantLock(); @@ -192,7 +195,7 @@ public class ContainerMapping implements Mapping { if(containerStore.isEmpty()) { throw new IOException("No container exists in current db"); } - KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefixName); + MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName); byte[] startKey = startName == null ? null : DFSUtil.string2Bytes(startName); List> range = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java index 71836dbcc1d..a22cd129210 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; @@ -260,8 +261,17 @@ public class ContainerReplicationManager implements Closeable { * a datanode. */ public void handleContainerReport(ContainerReportsProto containerReport) { - String poolName = poolManager.getNodePool( - DatanodeID.getFromProtoBuf(containerReport.getDatanodeID())); + String poolName = null; + DatanodeID datanodeID = DatanodeID + .getFromProtoBuf(containerReport.getDatanodeID()); + try { + poolName = poolManager.getNodePool(datanodeID); + } catch (SCMException e) { + LOG.warn("Skipping processing container report from datanode {}, " + + "cause: failed to get the corresponding node pool", + datanodeID.toString(), e); + return; + } for(InProgressPool ppool : inProgressPoolList) { if(ppool.getPoolName().equalsIgnoreCase(poolName)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java index ad902bac44d..f60bdc6ced2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java @@ -110,6 +110,7 @@ public class SCMException extends IOException { FAILED_TO_FIND_CONTAINER, FAILED_TO_FIND_CONTAINER_WITH_SAPCE, BLOCK_EXISTS, - FAILED_TO_FIND_BLOCK + FAILED_TO_FIND_BLOCK, + IO_EXCEPTION } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java index f91c7052edb..d3218b7bdc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import java.io.Closeable; +import java.io.IOException; import java.util.List; /** @@ -35,7 +36,7 @@ public interface NodePoolManager extends Closeable { * @param pool - name of the node pool. * @param node - data node. */ - void addNode(String pool, DatanodeID node); + void addNode(String pool, DatanodeID node) throws IOException; /** * Remove a node from a node pool. @@ -67,5 +68,5 @@ public interface NodePoolManager extends Closeable { * @return node pool name if it has been assigned. * null if the node has not been assigned to any node pool yet. */ - String getNodePool(DatanodeID datanodeID); + String getNodePool(DatanodeID datanodeID) throws SCMException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index a71fbcca06f..235809b4a35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -726,8 +726,16 @@ public class SCMNodeManager // TODO: define node pool policy for non-default node pool. // For now, all nodes are added to the "DefaultNodePool" upon registration // if it has not been added to any node pool yet. - if (nodePoolManager.getNodePool(datanodeID) == null) { - nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, datanodeID); + try { + if (nodePoolManager.getNodePool(datanodeID) == null) { + nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, + datanodeID); + } + } catch (IOException e) { + // TODO: make sure registration failure is handled correctly. + return RegisteredCommand.newBuilder() + .setErrorCode(ErrorCode.errorNodeNotPermitted) + .build(); } LOG.info("Data node with ID: {} Registered.", datanodeID.getDatanodeUuid()); @@ -823,4 +831,4 @@ public class SCMNodeManager } return nodeCountMap; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java index 9c1821fdb75..aa34f2909ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java @@ -26,9 +26,8 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +64,7 @@ public final class SCMNodePoolManager implements NodePoolManager { public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; // DB that saves the node to node pool mapping. - private LevelDBStore nodePoolStore; + private MetadataStore nodePoolStore; // In-memory node pool to nodes mapping private HashMap> nodePools; @@ -84,11 +83,12 @@ public final class SCMNodePoolManager implements NodePoolManager { OZONE_SCM_DB_CACHE_SIZE_DEFAULT); File metaDir = OzoneUtils.getScmMetadirPath(conf); String scmMetaDataDir = metaDir.getPath(); - Options options = new Options(); - options.cacheSize(cacheSize * OzoneConsts.MB); - File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB); - nodePoolStore = new LevelDBStore(nodePoolDBPath, options); + nodePoolStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(nodePoolDBPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); nodePools = new HashMap<>(); lock = new ReentrantReadWriteLock(); init(); @@ -100,14 +100,11 @@ public final class SCMNodePoolManager implements NodePoolManager { * @throws SCMException */ private void init() throws SCMException { - try (DBIterator iter = nodePoolStore.getIterator()) { - for (iter.seekToFirst(); iter.hasNext(); iter.next()) { + try { + nodePoolStore.iterate(null, (key, value) -> { try { - byte[] key = iter.peekNext().getKey(); DatanodeID nodeId = DatanodeID.getFromProtoBuf( HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key)); - - byte[] value = iter.peekNext().getValue(); String poolName = DFSUtil.bytes2String(value); Set nodePool = null; @@ -119,12 +116,14 @@ public final class SCMNodePoolManager implements NodePoolManager { } nodePool.add(nodeId); if (LOG.isDebugEnabled()) { - LOG.debug("Adding node: {} to node pool: {}", nodeId, poolName); + LOG.debug("Adding node: {} to node pool: {}", + nodeId, poolName); } - } catch (Exception ex) { + } catch (IOException e) { LOG.warn("Can't add a datanode to node pool, continue next..."); } - } + return true; + }); } catch (IOException e) { LOG.error("Loading node pool error " + e); throw new SCMException("Failed to load node pool", @@ -138,7 +137,8 @@ public final class SCMNodePoolManager implements NodePoolManager { * @param node - name of the datanode. */ @Override - public void addNode(final String pool, final DatanodeID node) { + public void addNode(final String pool, final DatanodeID node) + throws IOException { Preconditions.checkNotNull(pool, "pool name is null"); Preconditions.checkNotNull(node, "node is null"); lock.writeLock().lock(); @@ -192,6 +192,10 @@ public final class SCMNodePoolManager implements NodePoolManager { throw new SCMException(String.format("Unable to find node %s from" + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), FAILED_TO_FIND_NODE_IN_POOL); } + } catch (IOException e) { + throw new SCMException("Failed to remove node " + node.toString() + + " from node pool " + pool, e, + SCMException.ResultCodes.IO_EXCEPTION); } finally { lock.writeLock().unlock(); } @@ -238,14 +242,17 @@ public final class SCMNodePoolManager implements NodePoolManager { * TODO: Put this in a in-memory map if performance is an issue. */ @Override - public String getNodePool(final DatanodeID datanodeID) { + public String getNodePool(final DatanodeID datanodeID) throws SCMException { Preconditions.checkNotNull(datanodeID, "node is null"); - byte[] result = nodePoolStore.get( - datanodeID.getProtoBufMessage().toByteArray()); - if (result == null) { - return null; + try { + byte[] result = nodePoolStore.get( + datanodeID.getProtoBufMessage().toByteArray()); + return result == null ? null : DFSUtil.bytes2String(result); + } catch (IOException e) { + throw new SCMException("Failed to get node pool for node " + + datanodeID.toString(), e, + SCMException.ResultCodes.IO_EXCEPTION); } - return DFSUtil.bytes2String(result); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java index d8496125437..f28555aeeac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.utils.LevelDBStore; import org.apache.hadoop.ozone.web.exceptions.ErrorTable; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; @@ -39,8 +38,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.VolumeInfo; import org.apache.hadoop.ozone.web.response.VolumeOwner; -import org.iq80.leveldb.DBException; -import org.iq80.leveldb.DBIterator; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,8 +127,8 @@ public final class OzoneMetadataManager { private static final String USER_DB = "/user.db"; private static final String META_DB = "/metadata.db"; private static OzoneMetadataManager bm = null; - private LevelDBStore userDB; - private LevelDBStore metadataDB; + private MetadataStore userDB; + private MetadataStore metadataDB; private ReadWriteLock lock; private Charset encoding = Charset.forName("UTF-8"); private String storageRoot; @@ -157,8 +156,14 @@ public final class OzoneMetadataManager { } try { - userDB = new LevelDBStore(new File(storageRoot + USER_DB), true); - metadataDB = new LevelDBStore(new File(storageRoot + META_DB), true); + userDB = MetadataStoreBuilder.newBuilder() + .setDbFile(new File(storageRoot + USER_DB)) + .setCreateIfMissing(true) + .build(); + metadataDB = MetadataStoreBuilder.newBuilder() + .setDbFile(new File(storageRoot + META_DB)) + .setCreateIfMissing(true) + .build(); inProgressObjects = new ConcurrentHashMap<>(); } catch (IOException ex) { LOG.error("Cannot open db :" + ex.getMessage()); @@ -230,7 +235,7 @@ public final class OzoneMetadataManager { metadataDB.put(args.getVolumeName().getBytes(encoding), newVInfo.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); @@ -295,7 +300,7 @@ public final class OzoneMetadataManager { userDB.put(args.getResourceName().getBytes(encoding), volumeList.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); @@ -341,7 +346,7 @@ public final class OzoneMetadataManager { VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding)); return info.getOwner().getName().equals(acl.getName()); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex); } finally { lock.readLock().unlock(); @@ -365,7 +370,7 @@ public final class OzoneMetadataManager { } return VolumeInfo.parse(new String(volumeInfo, encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.readLock().unlock(); @@ -405,7 +410,7 @@ public final class OzoneMetadataManager { prevKey = volName[1]; } return getFilteredVolumes(volumeList, prefix, prevKey, maxCount); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex); } finally { lock.readLock().unlock(); @@ -448,80 +453,54 @@ public final class OzoneMetadataManager { * @return ListVolumes. * @throws OzoneException */ - public ListVolumes listAllVolumes(ListArgs args) throws OzoneException, - IOException { + public ListVolumes listAllVolumes(ListArgs args) + throws OzoneException, IOException { String prefix = args.getPrefix(); - String prevKey = args.getPrevKey(); + final String prevKey; int maxCount = args.getMaxKeys(); String userName = null; - try (DBIterator iterator = this.userDB.getDB().iterator()) { - if (prevKey != null) { - // Format is username/volumeName - - String[] volName = args.getPrevKey().split("/"); - if (volName.length < 2) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); - } - seekToUser(iterator, volName[0]); - userName = new String(iterator.peekNext().getKey(), encoding); - prevKey = volName[1]; - } else { - userName = getFirstUser(iterator); - } - - if (userName == null || userName.isEmpty()) { + if (args.getPrevKey() != null) { + // Format is username/volumeName + String[] volName = args.getPrevKey().split("/"); + if (volName.length < 2) { throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); } - ListVolumes returnSet = new ListVolumes(); - int count = maxCount - returnSet.getVolumes().size(); + byte[] userNameBytes = userDB.get(volName[0].getBytes(encoding)); + userName = new String(userNameBytes, encoding); + prevKey = volName[1]; + } else { + userName = new String(userDB.peekAround(0, null).getKey(), encoding); + prevKey = null; + } - // we need to iterate through users until we get maxcount volumes - // or no more volumes are left. - while (iterator.hasNext() && count > 0) { + if (userName == null || userName.isEmpty()) { + throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); + } - userName = new String(iterator.next().getKey(), encoding); - - byte[] volumeList = userDB.get(userName.getBytes(encoding)); + ListVolumes returnSet = new ListVolumes(); + // we need to iterate through users until we get maxcount volumes + // or no more volumes are left. + userDB.iterate(null, (key, value) -> { + int currentSize = returnSet.getVolumes().size(); + if (currentSize < maxCount) { + String name = new String(key, encoding); + byte[] volumeList = userDB.get(name.getBytes(encoding)); if (volumeList == null) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); + throw new IOException( + ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs())); } - - returnSet.getVolumes().addAll(getFilteredVolumes( - volumeList, prefix, prevKey, count).getVolumes()); - count = maxCount - returnSet.getVolumes().size(); + returnSet.getVolumes().addAll( + getFilteredVolumes(volumeList, prefix, prevKey, + maxCount - currentSize).getVolumes()); + return true; + } else { + return false; } - return returnSet; - } - } + }); - /** - * Returns the first user name from the UserDB. - * - * @return - UserName. - * @throws IOException - */ - String getFirstUser(DBIterator iterator) throws IOException { - iterator.seekToFirst(); - if (iterator.hasNext()) { - return new String(iterator.peekNext().getKey(), encoding); - } - return null; - } - - /** - * Reposition the DB cursor to the user name. - * - * @param iterator - Current Iterator. - * @param userName - userName to seek to - * @return - DBIterator. - * @throws IOException - */ - DBIterator seekToUser(DBIterator iterator, String userName) throws - IOException { - iterator.seek(userName.getBytes(encoding)); - return iterator; + return returnSet; } /** @@ -587,7 +566,7 @@ public final class OzoneMetadataManager { metadataDB.delete(args.getVolumeName().getBytes(encoding)); userDB.put(user.getBytes(encoding), volumeList.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); @@ -659,7 +638,7 @@ public final class OzoneMetadataManager { metadataDB.put(args.getResourceName().getBytes(encoding), bucketInfo.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); @@ -716,7 +695,7 @@ public final class OzoneMetadataManager { userDB.put(args.getParentName().getBytes(encoding), bucketList.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); @@ -807,7 +786,7 @@ public final class OzoneMetadataManager { metadataDB.delete(args.getResourceName().getBytes(encoding)); userDB.put(args.getParentName().getBytes(encoding), bucketList.toDBString().getBytes(encoding)); - } catch (IOException | DBException ex) { + } catch (IOException ex) { throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); } finally { lock.writeLock().unlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java new file mode 100644 index 00000000000..47699ebba80 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * An utility class to store a batch of DB write operations. + */ +public class BatchOperation { + + /** + * Enum for write operations. + */ + public enum Operation { + DELETE, PUT + } + + private List operations = + Lists.newArrayList(); + + /** + * Add a PUT operation into the batch. + */ + public void put(byte[] key, byte[] value) { + operations.add(new SingleOperation(Operation.PUT, key, value)); + } + + /** + * Add a DELETE operation into the batch. + */ + public void delete(byte[] key) { + operations.add(new SingleOperation(Operation.DELETE, key, null)); + + } + + public List getOperations() { + return operations; + } + + /** + * A SingleOperation represents a PUT or DELETE operation + * and the data the operation needs to manipulates. + */ + public static class SingleOperation { + + private Operation opt; + private byte[] key; + private byte[] value; + + public SingleOperation(Operation opt, byte[] key, byte[] value) { + this.opt = opt; + if (key == null) { + throw new IllegalArgumentException("key cannot be null"); + } + this.key = key.clone(); + this.value = value == null ? null : value.clone(); + } + + public Operation getOpt() { + return opt; + } + + public byte[] getKey() { + return key.clone(); + } + + public byte[] getValue() { + return value == null ? null : value.clone(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java new file mode 100644 index 00000000000..c4073986118 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import java.io.IOException; + +/** + * A consumer for metadata store key-value entries. + * Used by {@link MetadataStore} class. + */ +@FunctionalInterface +public interface EntryConsumer { + + /** + * Consumes a key and value and produces a boolean result. + * @param key key + * @param value value + * @return a boolean value produced by the consumer + * @throws IOException + */ + boolean consume(byte[] key, byte[] value) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java index e6f6fa6c032..415b7883a3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -18,7 +18,8 @@ package org.apache.hadoop.utils; -import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.WriteBatch; import org.iq80.leveldb.DB; @@ -30,7 +31,6 @@ import org.iq80.leveldb.ReadOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.List; @@ -41,7 +41,7 @@ import java.util.Map.Entry; /** * LevelDB interface. */ -public class LevelDBStore implements Closeable { +public class LevelDBStore implements MetadataStore { private static final Logger LOG = LoggerFactory.getLogger(LevelDBStore.class); @@ -51,23 +51,13 @@ public class LevelDBStore implements Closeable { private final Options dbOptions; private final WriteOptions writeOptions; - /** - * Opens a DB file. - * - * @param dbPath - DB File path - * @param createIfMissing - Create if missing - * @throws IOException - */ - public LevelDBStore(File dbPath, boolean createIfMissing) throws - IOException { + public LevelDBStore(File dbPath, boolean createIfMissing) + throws IOException { dbOptions = new Options(); dbOptions.createIfMissing(createIfMissing); - db = JniDBFactory.factory.open(dbPath, dbOptions); - if (db == null) { - throw new IOException("Db is null"); - } this.dbFile = dbPath; this.writeOptions = new WriteOptions().sync(true); + openDB(dbPath, dbOptions); } /** @@ -79,14 +69,23 @@ public class LevelDBStore implements Closeable { public LevelDBStore(File dbPath, Options options) throws IOException { dbOptions = options; - db = JniDBFactory.factory.open(dbPath, options); - if (db == null) { - throw new IOException("Db is null"); - } this.dbFile = dbPath; this.writeOptions = new WriteOptions().sync(true); + openDB(dbPath, dbOptions); } + private void openDB(File dbPath, Options options) throws IOException { + db = JniDBFactory.factory.open(dbPath, options); + if (LOG.isDebugEnabled()) { + LOG.debug("LevelDB successfully opened"); + LOG.debug("[Option] cacheSize = " + options.cacheSize()); + LOG.debug("[Option] createIfMissing = " + options.createIfMissing()); + LOG.debug("[Option] blockSize = " + options.blockSize()); + LOG.debug("[Option] compressionType= " + options.compressionType()); + LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles()); + LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize()); + } + } /** * Puts a Key into file. @@ -94,6 +93,7 @@ public class LevelDBStore implements Closeable { * @param key - key * @param value - value */ + @Override public void put(byte[] key, byte[] value) { db.put(key, value, writeOptions); } @@ -104,6 +104,7 @@ public class LevelDBStore implements Closeable { * @param key key * @return value */ + @Override public byte[] get(byte[] key) { return db.get(key); } @@ -113,6 +114,7 @@ public class LevelDBStore implements Closeable { * * @param key - Key */ + @Override public void delete(byte[] key) { db.delete(key); } @@ -133,24 +135,15 @@ public class LevelDBStore implements Closeable { * @return boolean * @throws IOException */ + @Override public boolean isEmpty() throws IOException { - DBIterator iter = db.iterator(); - try { + try (DBIterator iter = db.iterator()) { iter.seekToFirst(); - return !iter.hasNext(); - } finally { - iter.close(); + boolean hasNext = !iter.hasNext(); + return hasNext; } } - /** - * Returns Java File Object that points to the DB. - * @return File - */ - public File getDbFile() { - return dbFile; - } - /** * Returns the actual levelDB object. * @return DB handle. @@ -168,39 +161,71 @@ public class LevelDBStore implements Closeable { } + @Override public void destroy() throws IOException { JniDBFactory.factory.destroy(dbFile, dbOptions); } - /** - * Returns a write batch for write multiple key-value pairs atomically. - * @return write batch that can be commit atomically. - */ - public WriteBatch createWriteBatch() { - return db.createWriteBatch(); + @Override + public ImmutablePair peekAround(int offset, + byte[] from) throws IOException, IllegalArgumentException { + try (DBIterator it = db.iterator()) { + if (from == null) { + it.seekToFirst(); + } else { + it.seek(from); + } + if (!it.hasNext()) { + throw new IOException("Key not found"); + } + switch (offset) { + case 0: + Entry current = it.next(); + return new ImmutablePair<>(current.getKey(), current.getValue()); + case 1: + if (it.next() != null && it.hasNext()) { + Entry next = it.peekNext(); + return new ImmutablePair<>(next.getKey(), next.getValue()); + } + break; + case -1: + if (it.hasPrev()) { + Entry prev = it.peekPrev(); + return new ImmutablePair<>(prev.getKey(), prev.getValue()); + } + break; + default: + throw new IllegalArgumentException( + "Position can only be -1, 0 " + "or 1, but found " + offset); + } + } + return null; } - /** - * Commit multiple writes of key-value pairs atomically. - * @param wb - */ - public void commitWriteBatch(WriteBatch wb) { - db.write(wb, writeOptions); - } - - /** - * Close a write batch of multiple writes to key-value pairs. - * @param wb - write batch. - * @throws IOException - */ - public void closeWriteBatch(WriteBatch wb) throws IOException { - wb.close(); + @Override + public void iterate(byte[] from, EntryConsumer consumer) + throws IOException { + try (DBIterator iter = db.iterator()) { + if (from != null) { + iter.seek(from); + } else { + iter.seekToFirst(); + } + while (iter.hasNext()) { + Entry current = iter.next(); + if (!consumer.consume(current.getKey(), + current.getValue())) { + break; + } + } + } } /** * Compacts the DB by removing deleted keys etc. * @throws IOException if there is an error. */ + @Override public void compactDB() throws IOException { if(db != null) { // From LevelDB docs : begin == null and end == null means the whole DB. @@ -208,26 +233,33 @@ public class LevelDBStore implements Closeable { } } - /** - * Returns a certain range of key value pairs as a list based on a startKey - * or count. - * - * @param keyPrefix start key. - * @param count number of entries to return. - * @return a range of entries or an empty list if nothing found. - * @throws IOException - * - * @see #getRangeKVs(byte[], int, LevelDBKeyFilter...) - */ - public List> getRangeKVs(byte[] keyPrefix, int count) - throws IOException { - LevelDBKeyFilter emptyFilter = (preKey, currentKey, nextKey) -> true; - return getRangeKVs(keyPrefix, count, emptyFilter); + @Override + public void writeBatch(BatchOperation operation) throws IOException { + List operations = + operation.getOperations(); + if (!operations.isEmpty()) { + try (WriteBatch writeBatch = db.createWriteBatch()) { + for (BatchOperation.SingleOperation opt : operations) { + switch (opt.getOpt()) { + case DELETE: + writeBatch.delete(opt.getKey()); + break; + case PUT: + writeBatch.put(opt.getKey(), opt.getValue()); + break; + default: + throw new IllegalArgumentException("Invalid operation " + + opt.getOpt()); + } + } + db.write(writeBatch); + } + } } /** * Returns a certain range of key value pairs as a list based on a - * startKey or count. Further a {@link LevelDBKeyFilter} can be added to + * startKey or count. Further a {@link MetadataKeyFilter} can be added to * filter keys if necessary. To prevent race conditions while listing * entries, this implementation takes a snapshot and lists the entries from * the snapshot. This may, on the other hand, cause the range result slight @@ -241,19 +273,20 @@ public class LevelDBStore implements Closeable { * The count argument is to limit number of total entries to return, * the value for count must be an integer greater than 0. *

- * This method allows to specify one or more {@link LevelDBKeyFilter} + * This method allows to specify one or more {@link MetadataKeyFilter} * to filter keys by certain condition. Once given, only the entries * whose key passes all the filters will be included in the result. * * @param startKey a start key. * @param count max number of entries to return. - * @param filters customized one or more {@link LevelDBKeyFilter}. + * @param filters customized one or more {@link MetadataKeyFilter}. * @return a list of entries found in the database. * @throws IOException if an invalid startKey is given or other I/O errors. * @throws IllegalArgumentException if count is less than 0. */ + @Override public List> getRangeKVs(byte[] startKey, - int count, LevelDBKeyFilter... filters) throws IOException { + int count, MetadataKeyFilter... filters) throws IOException { List> result = new ArrayList<>(); long start = System.currentTimeMillis(); if (count < 0) { @@ -295,8 +328,7 @@ public class LevelDBStore implements Closeable { long timeConsumed = end - start; if (LOG.isDebugEnabled()) { LOG.debug("Time consumed for getRangeKVs() is {}," - + " result length is {}.", - timeConsumed, result.size()); + + " result length is {}.", timeConsumed, result.size()); } } return result; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBKeyFilters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java similarity index 93% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBKeyFilters.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java index f813d77b0fd..1688ece4b95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBKeyFilters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java @@ -23,12 +23,12 @@ import org.apache.hadoop.hdfs.DFSUtil; /** * An utility class to filter levelDB keys. */ -public class LevelDBKeyFilters { +public class MetadataKeyFilters { /** * Interface for levelDB key filters. */ - public interface LevelDBKeyFilter { + public interface MetadataKeyFilter { /** * Filter levelDB key with a certain condition. * @@ -44,7 +44,7 @@ public class LevelDBKeyFilters { * Utility class to filter key by a string prefix. This filter * assumes keys can be parsed to a string. */ - public static class KeyPrefixFilter implements LevelDBKeyFilter { + public static class KeyPrefixFilter implements MetadataKeyFilter { private String keyPrefix = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStore.java new file mode 100644 index 00000000000..c3b738e4047 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStore.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Interface for key-value store that stores ozone metadata. + * Ozone metadata is stored as key value pairs, both key and value + * are arbitrary byte arrays. + */ +@InterfaceStability.Evolving +public interface MetadataStore extends Closeable{ + + /** + * Puts a key-value pair into the store. + * + * @param key metadata key + * @param value metadata value + */ + void put(byte[] key, byte[] value) throws IOException; + + /** + * @return true if the metadata store is empty. + * + * @throws IOException + */ + boolean isEmpty() throws IOException; + + /** + * Returns the value mapped to the given key in byte array. + * + * @param key metadata key + * @return value in byte array + * @throws IOException + */ + byte[] get(byte[] key) throws IOException; + + /** + * Deletes a key from the metadata store. + * + * @param key metadata key + * @throws IOException + */ + void delete(byte[] key) throws IOException; + + /** + * Returns a certain range of key value pairs as a list based on a + * startKey or count. Further a {@link MetadataKeyFilter} can be added to + * filter keys if necessary. To prevent race conditions while listing + * entries, this implementation takes a snapshot and lists the entries from + * the snapshot. This may, on the other hand, cause the range result slight + * different with actual data if data is updating concurrently. + *

+ * If the startKey is specified and found in levelDB, this key and the keys + * after this key will be included in the result. If the startKey is null + * all entries will be included as long as other conditions are satisfied. + * If the given startKey doesn't exist, an IOException will be thrown. + *

+ * The count argument is to limit number of total entries to return, + * the value for count must be an integer greater than 0. + *

+ * This method allows to specify one or more {@link MetadataKeyFilter} + * to filter keys by certain condition. Once given, only the entries + * whose key passes all the filters will be included in the result. + * + * @param startKey a start key. + * @param count max number of entries to return. + * @param filters customized one or more {@link MetadataKeyFilter}. + * @return a list of entries found in the database. + * @throws IOException if an invalid startKey is given or other I/O errors. + * @throws IllegalArgumentException if count is less than 0. + */ + List> getRangeKVs(byte[] startKey, + int count, MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException; + + /** + * A batch of PUT, DELETE operations handled as a single atomic write. + * + * @throws IOException write fails + */ + void writeBatch(BatchOperation operation) throws IOException; + + /** + * Compact the entire database. + * @throws IOException + */ + void compactDB() throws IOException; + + /** + * Destroy the content of the specified database, + * a destroyed database will not be able to load again. + * Be very careful with this method. + * + * @throws IOException if I/O error happens + */ + void destroy() throws IOException; + + /** + * Seek the database to a certain key, returns the key-value + * pairs around this key based on the given offset. Note, this method + * can only support offset -1 (left), 0 (current) and 1 (right), + * any other offset given will cause a {@link IllegalArgumentException}. + * + * @param offset offset to the key + * @param from from which key + * @return a key-value pair + * @throws IOException + */ + ImmutablePair peekAround(int offset, byte[] from) + throws IOException, IllegalArgumentException; + + /** + * Iterates entries in the database from a certain key. + * Applies the given {@link EntryConsumer} to the key and value of + * each entry, the function produces a boolean result which is used + * as the criteria to exit from iteration. + * + * @param from the start key + * @param consumer + * a {@link EntryConsumer} applied to each key and value. If the consumer + * returns true, continues the iteration to next entry; otherwise exits + * the iteration. + * @throws IOException + */ + void iterate(byte[] from, EntryConsumer consumer) + throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java new file mode 100644 index 00000000000..81f2d8a5a73 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.iq80.leveldb.Options; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB; + +/** + * Builder for metadata store. + */ +public class MetadataStoreBuilder { + + private File dbFile; + private long cacheSize; + private boolean createIfMissing = true; + private Configuration conf; + + public static MetadataStoreBuilder newBuilder() { + return new MetadataStoreBuilder(); + } + + public MetadataStoreBuilder setDbFile(File dbPath) { + this.dbFile = dbPath; + return this; + } + + public MetadataStoreBuilder setCacheSize(long cache) { + this.cacheSize = cache; + return this; + } + + public MetadataStoreBuilder setCreateIfMissing(boolean doCreate) { + this.createIfMissing = doCreate; + return this; + } + + public MetadataStoreBuilder setConf(Configuration configuration) { + this.conf = configuration; + return this; + } + + public MetadataStore build() throws IOException { + if (dbFile == null) { + throw new IllegalArgumentException("Failed to build metadata store, " + + "dbFile is required but not found"); + } + + // Build db store based on configuration + MetadataStore store = null; + String impl = conf == null ? + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT : + conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT); + if (OZONE_METADATA_STORE_IMPL_LEVELDB.equals(impl)) { + Options options = new Options(); + options.createIfMissing(createIfMissing); + if (cacheSize > 0) { + options.cacheSize(cacheSize); + } + store = new LevelDBStore(dbFile, options); + } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) { + // TODO replace with rocksDB impl + store = new LevelDBStore(dbFile, new Options()); + } else { + throw new IllegalArgumentException("Invalid argument for " + + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL + + ". Expecting " + OZONE_METADATA_STORE_IMPL_LEVELDB + + " or " + OZONE_METADATA_STORE_IMPL_ROCKSDB + + ", but met " + impl); + } + return store; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 772c69f48d1..cd444e9e959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -565,6 +565,17 @@ + + ozone.metastore.impl + LevelDB + + Ozone metadata store implementation. Ozone metadata are well distributed + to multiple services such as ksm, scm. They are stored in some local + key-value databases. This property determines which database library to + use. Supported value is either LevelDB or RocksDB. + + + dfs.cblock.servicerpc-address diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestLevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java similarity index 55% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestLevelDBStore.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java index c882d645aef..0000e50dba5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestLevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.ozone; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter; -import org.apache.hadoop.utils.LevelDBStore; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.junit.Rule; import org.junit.Before; import org.junit.After; @@ -34,13 +38,15 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.ArrayList; +import java.util.UUID; /** - * Test class for {@link org.apache.hadoop.utils.LevelDBStore}. + * Test class for ozone metadata store. */ -public class TestLevelDBStore { +public class TestMetadataStore { - private LevelDBStore store; + private MetadataStore store; private File testDir; private final static int MAX_GETRANGE_LENGTH = 100; @@ -51,7 +57,16 @@ public class TestLevelDBStore { @Before public void init() throws IOException { testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()); - store = new LevelDBStore(testDir, true); + + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB); + + store = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setCreateIfMissing(true) + .setDbFile(testDir) + .build(); // Add 20 entries. // {a0 : a-value0} to {a9 : a-value9} @@ -70,11 +85,127 @@ public class TestLevelDBStore { } private byte[] getBytes(String str) { - return DFSUtilClient.string2Bytes(str); + return str == null ? null : + DFSUtilClient.string2Bytes(str); } private String getString(byte[] bytes) { - return DFSUtilClient.bytes2String(bytes); + return bytes == null ? null : + DFSUtilClient.bytes2String(bytes); + } + + @Test + public void testGetDelete() throws IOException { + for (int i=0; i<10; i++) { + byte[] va = store.get(getBytes("a" + i)); + Assert.assertEquals("a-value" + i, getString(va)); + + byte[] vb = store.get(getBytes("b" + i)); + Assert.assertEquals("b-value" + i, getString(vb)); + } + + String keyToDel = "del-" + UUID.randomUUID().toString(); + store.put(getBytes(keyToDel), getBytes(keyToDel)); + Assert.assertEquals(keyToDel, getString(store.get(getBytes(keyToDel)))); + store.delete(getBytes(keyToDel)); + Assert.assertEquals(null, store.get(getBytes(keyToDel))); + } + + @Test + public void testPeekFrom() throws IOException { + // Test peek from an element that has prev as well as next + testPeek("a3", "a2", "a4"); + + // Test peek from an element that only has prev + testPeek("b9", "b8", null); + + // Test peek from an element that only has next + testPeek("a0", null, "a1"); + } + + private String getExpectedValue(String key) { + if (key == null) { + return null; + } + char[] arr = key.toCharArray(); + return new StringBuffer().append(arr[0]).append("-value") + .append(arr[arr.length - 1]).toString(); + } + + private void testPeek(String peekKey, String prevKey, String nextKey) + throws IOException { + // Look for current + String k = null; + String v = null; + ImmutablePair current = + store.peekAround(0, getBytes(peekKey)); + if (current != null) { + k = getString(current.getKey()); + v = getString(current.getValue()); + } + Assert.assertEquals(peekKey, k); + Assert.assertEquals(v, getExpectedValue(peekKey)); + + // Look for prev + k = null; + v = null; + ImmutablePair prev = + store.peekAround(-1, getBytes(peekKey)); + if (prev != null) { + k = getString(prev.getKey()); + v = getString(prev.getValue()); + } + Assert.assertEquals(prevKey, k); + Assert.assertEquals(v, getExpectedValue(prevKey)); + + // Look for next + k = null; + v = null; + ImmutablePair next = + store.peekAround(1, getBytes(peekKey)); + if (next != null) { + k = getString(next.getKey()); + v = getString(next.getValue()); + } + Assert.assertEquals(nextKey, k); + Assert.assertEquals(v, getExpectedValue(nextKey)); + } + + @Test + public void testIterateKeys() throws IOException { + // iterate keys from b0 + ArrayList result = Lists.newArrayList(); + store.iterate(getBytes("b0"), (k, v) -> { + // b-value{i} + String value = getString(v); + char num = value.charAt(value.length() - 1); + // each value adds 1 + int i = Character.getNumericValue(num) + 1; + value = value.substring(0, value.length() - 1) + i; + result.add(value); + return true; + }); + + Assert.assertFalse(result.isEmpty()); + for (int i=0; i { + result.add(getString(v)); + return true; + }); + Assert.assertTrue(result.isEmpty()); + + // iterate from the beginning + result.clear(); + store.iterate(null, (k, v) -> { + result.add(getString(v)); + return true; + }); + Assert.assertEquals(20, result.size()); } @Test @@ -104,7 +235,7 @@ public class TestLevelDBStore { // Filter keys by prefix. // It should returns all "b*" entries. - LevelDBKeyFilter filter1 = new KeyPrefixFilter("b"); + MetadataKeyFilter filter1 = new KeyPrefixFilter("b"); result = store.getRangeKVs(null, 100, filter1); Assert.assertEquals(10, result.size()); Assert.assertTrue(result.stream().allMatch(entry -> @@ -117,7 +248,7 @@ public class TestLevelDBStore { // Define a customized filter that filters keys by suffix. // Returns all "*2" entries. - LevelDBKeyFilter filter2 = (preKey, currentKey, nextKey) + MetadataKeyFilter filter2 = (preKey, currentKey, nextKey) -> getString(currentKey).endsWith("2"); result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter2); Assert.assertEquals(2, result.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 3b53e8f818d..7a23b87a4de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.KeyData; -import org.apache.hadoop.utils.LevelDBStore; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -182,10 +183,13 @@ public class TestContainerPersistence { String dbPath = status.getContainer().getDBPath(); - LevelDBStore store = null; + MetadataStore store = null; try { - store = new LevelDBStore(new File(dbPath), false); - Assert.assertNotNull(store.getDB()); + store = MetadataStoreBuilder.newBuilder() + .setDbFile(new File(dbPath)) + .setCreateIfMissing(false) + .build(); + Assert.assertNotNull(store); } finally { if (store != null) { store.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java index dc42a34cf73..cb88703c8b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java @@ -41,6 +41,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -229,8 +230,8 @@ public class TestContainerReplicationManager { * @throws TimeoutException * @throws InterruptedException */ - public void testAddingNewPoolWorks() throws TimeoutException, - InterruptedException { + public void testAddingNewPoolWorks() + throws TimeoutException, InterruptedException, IOException { LogCapturer inProgressLog = LogCapturer.captureLogs( LogFactory.getLog(InProgressPool.class)); GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.ALL); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index 382f95a627e..18a5114fd6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -274,6 +274,8 @@ public class TestKeySpaceManager { try { storageHandler.deleteVolume(createVolumeArgs); + Assert.fail("Expecting deletion should fail " + + "because volume is not empty"); } catch (IOException ex) { Assert.assertEquals(ex.getMessage(), "Delete Volume failed, error:VOLUME_NOT_EMPTY");