HDFS-12069. Ozone: Create a general abstraction for metadata store. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-07-16 09:34:02 +08:00
parent 90f1d58546
commit 8f122a7505
30 changed files with 1031 additions and 494 deletions

View File

@ -61,6 +61,15 @@ public final class OzoneConfigKeys {
public static final String OZONE_CONTAINER_METADATA_DIRS =
"ozone.container.metadata.dirs";
public static final String OZONE_METADATA_STORE_IMPL =
"ozone.metastore.impl";
public static final String OZONE_METADATA_STORE_IMPL_LEVELDB =
"LevelDB";
public static final String OZONE_METADATA_STORE_IMPL_ROCKSDB =
"RocksDB";
public static final String OZONE_METADATA_STORE_IMPL_DEFAULT =
OZONE_METADATA_STORE_IMPL_LEVELDB;
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;

View File

@ -27,8 +27,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -233,7 +234,8 @@ public final class ContainerUtils {
* @param containerPath - Container Path.
* @throws IOException
*/
public static Path createMetadata(Path containerPath) throws IOException {
public static Path createMetadata(Path containerPath, Configuration conf)
throws IOException {
Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
Preconditions.checkNotNull(containerPath);
Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
@ -243,9 +245,11 @@ public final class ContainerUtils {
throw new IOException("Unable to create directory for metadata storage." +
" Path: " + metadataPath);
}
LevelDBStore store =
new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
.toFile(), true);
MetadataStore store = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setCreateIfMissing(true)
.setDbFile(metadataPath.resolve(OzoneConsts.CONTAINER_DB).toFile())
.build();
// we close since the SCM pre-creates containers.
// we will open and put Db handle into a cache when keys are being created
@ -347,7 +351,7 @@ public final class ContainerUtils {
Preconditions.checkNotNull(containerData);
Path dbPath = Paths.get(containerData.getDBPath());
LevelDBStore db = KeyUtils.getDB(containerData, conf);
MetadataStore db = KeyUtils.getDB(containerData, conf);
// If the container is not empty and cannot be deleted forcibly,
// then throw a SCE to stop deleting.
if(!forceDelete && !db.isEmpty()) {

View File

@ -23,7 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,20 +58,21 @@ public final class KeyUtils {
*
* @param container container.
* @param conf configuration.
* @return LevelDB handle.
* @return MetadataStore handle.
* @throws StorageContainerException
*/
public static LevelDBStore getDB(ContainerData container,
public static MetadataStore getDB(ContainerData container,
Configuration conf) throws StorageContainerException {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
try {
LevelDBStore db = cache.getDB(container.getContainerName());
MetadataStore db = cache.getDB(container.getContainerName());
if (db == null) {
db = new LevelDBStore(
new File(container.getDBPath()),
false);
db = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(container.getDBPath()))
.setCreateIfMissing(false)
.build();
cache.putDB(container.getContainerName(), db);
}
return db;
@ -103,10 +105,10 @@ public final class KeyUtils {
@SuppressWarnings("unchecked")
public static void shutdownCache(ContainerCache cache) {
Logger log = LoggerFactory.getLogger(KeyManagerImpl.class);
LevelDBStore[] handles = new LevelDBStore[cache.values().size()];
MetadataStore[] handles = new MetadataStore[cache.values().size()];
cache.values().toArray(handles);
Preconditions.checkState(handles.length == cache.values().size());
for (LevelDBStore db : handles) {
for (MetadataStore db : handles) {
try {
db.close();
} catch (IOException ex) {

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.container.common.interfaces
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -335,7 +335,7 @@ public class ContainerManagerImpl implements ContainerManager {
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
metadataPath = this.locationManager.getDataPath(
containerData.getContainerName());
metadataPath = ContainerUtils.createMetadata(metadataPath);
metadataPath = ContainerUtils.createMetadata(metadataPath, conf);
} else {
metadataPath = ContainerUtils.getMetadataDirectory(containerData);
}
@ -502,7 +502,7 @@ public class ContainerManagerImpl implements ContainerManager {
ContainerData containerData = readContainer(containerName);
containerData.closeContainer();
writeContainerInfo(containerData, true);
LevelDBStore db = KeyUtils.getDB(containerData, conf);
MetadataStore db = KeyUtils.getDB(containerData, conf);
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.

View File

@ -30,8 +30,9 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,12 +41,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.NO_SUCH_KEY;
/**
* Key Manager impl.
*/
@ -74,8 +72,7 @@ public class KeyManagerImpl implements KeyManager {
* {@inheritDoc}
*/
@Override
public void putKey(Pipeline pipeline, KeyData data)
throws StorageContainerException {
public void putKey(Pipeline pipeline, KeyData data) throws IOException {
containerManager.readLock();
try {
// We are not locking the key manager since LevelDb serializes all actions
@ -85,7 +82,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(
pipeline.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, conf);
MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -102,7 +99,7 @@ public class KeyManagerImpl implements KeyManager {
* {@inheritDoc}
*/
@Override
public KeyData getKey(KeyData data) throws StorageContainerException {
public KeyData getKey(KeyData data) throws IOException {
containerManager.readLock();
try {
Preconditions.checkNotNull(data, "Key data cannot be null");
@ -110,7 +107,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(data
.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, conf);
MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -124,8 +121,6 @@ public class KeyManagerImpl implements KeyManager {
ContainerProtos.KeyData keyData =
ContainerProtos.KeyData.parseFrom(kData);
return KeyData.getFromProtoBuf(keyData);
} catch (IOException ex) {
throw new StorageContainerException(ex, IO_EXCEPTION);
} finally {
containerManager.readUnlock();
}
@ -136,7 +131,7 @@ public class KeyManagerImpl implements KeyManager {
*/
@Override
public void deleteKey(Pipeline pipeline, String keyName)
throws StorageContainerException {
throws IOException {
containerManager.readLock();
try {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
@ -144,7 +139,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, conf);
MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -171,32 +166,28 @@ public class KeyManagerImpl implements KeyManager {
@Override
public List<KeyData> listKey(
Pipeline pipeline, String prefix, String startKey, int count)
throws StorageContainerException {
throws IOException {
Preconditions.checkNotNull(pipeline,
"Pipeline cannot be null.");
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, conf);
try {
List<KeyData> result = new ArrayList<KeyData>();
byte[] startKeyInBytes = startKey == null ? null :
DFSUtil.string2Bytes(startKey);
KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefix);
List<Map.Entry<byte[], byte[]>> range =
db.getRangeKVs(startKeyInBytes, count, prefixFilter);
for(Map.Entry<byte[], byte[]> entry : range) {
String keyName = KeyUtils.getKeyName(entry.getKey());
KeyData value = KeyUtils.getKeyData(entry.getValue());
KeyData data = new KeyData(value.getContainerName(), keyName);
result.add(data);
}
return result;
} catch (IOException e) {
throw new StorageContainerException(e,
ContainerProtos.Result.IO_EXCEPTION);
MetadataStore db = KeyUtils.getDB(cData, conf);
List<KeyData> result = new ArrayList<KeyData>();
byte[] startKeyInBytes = startKey == null ? null :
DFSUtil.string2Bytes(startKey);
MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix);
List<Map.Entry<byte[], byte[]>> range =
db.getRangeKVs(startKeyInBytes, count, prefixFilter);
for (Map.Entry<byte[], byte[]> entry : range) {
String keyName = KeyUtils.getKeyName(entry.getKey());
KeyData value = KeyUtils.getKeyData(entry.getValue());
KeyData data = new KeyData(value.getContainerName(), keyName);
result.add(data);
}
return result;
}
/**

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
import java.util.List;
/**
@ -32,18 +33,18 @@ public interface KeyManager {
*
* @param pipeline - Pipeline.
* @param data - Key Data.
* @throws StorageContainerException
* @throws IOException
*/
void putKey(Pipeline pipeline, KeyData data) throws StorageContainerException;
void putKey(Pipeline pipeline, KeyData data) throws IOException;
/**
* Gets an existing key.
*
* @param data - Key Data.
* @return Key Data.
* @throws StorageContainerException
* @throws IOException
*/
KeyData getKey(KeyData data) throws StorageContainerException;
KeyData getKey(KeyData data) throws IOException;
/**
* Deletes an existing Key.
@ -53,7 +54,7 @@ public interface KeyManager {
* @throws StorageContainerException
*/
void deleteKey(Pipeline pipeline, String keyName)
throws StorageContainerException;
throws IOException;
/**
* List keys in a container.
@ -65,7 +66,7 @@ public interface KeyManager {
* @return List of Keys that match the criteria.
*/
List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
int count) throws StorageContainerException;
int count) throws IOException;
/**
* Shutdown keyManager.

View File

@ -24,7 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
@ -69,7 +69,7 @@ public final class ContainerCache extends LRUMap {
protected boolean removeLRU(LinkEntry entry) {
lock.lock();
try {
LevelDBStore db = (LevelDBStore) entry.getValue();
MetadataStore db = (MetadataStore) entry.getValue();
db.close();
} catch (IOException e) {
LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e);
@ -83,14 +83,14 @@ public final class ContainerCache extends LRUMap {
* Returns a DB handle if available, null otherwise.
*
* @param containerName - Name of the container.
* @return OzoneLevelDBStore.
* @return MetadataStore.
*/
public LevelDBStore getDB(String containerName) {
public MetadataStore getDB(String containerName) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
return (LevelDBStore) this.get(containerName);
return (MetadataStore) this.get(containerName);
} finally {
lock.unlock();
}
@ -106,7 +106,7 @@ public final class ContainerCache extends LRUMap {
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
LevelDBStore db = this.getDB(containerName);
MetadataStore db = this.getDB(containerName);
if (db != null) {
try {
db.close();
@ -126,7 +126,7 @@ public final class ContainerCache extends LRUMap {
* @param containerName - Name of the container
* @param db - DB handle
*/
public void putDB(String containerName, LevelDBStore db) {
public void putDB(String containerName, MetadataStore db) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();

View File

@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.utils.BatchOperation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
/**
@ -56,36 +56,27 @@ public interface MetadataManager {
* @param key - key
* @return value
*/
byte[] get(byte[] key);
byte[] get(byte[] key) throws IOException;
/**
* Puts a Key into Metadata DB.
* @param key - key
* @param value - value
*/
void put(byte[] key, byte[] value);
void put(byte[] key, byte[] value) throws IOException;
/**
* Deletes a Key from Metadata DB.
* @param key - key
*/
void delete(byte[] key);
void delete(byte[] key) throws IOException;
/**
* Performs batch Put and Delete to Metadata DB.
* Can be used to do multiple puts and deletes atomically.
* @param putList - list of Key/Value to put into DB
* @param delList - list of Key to delete from DB
* Atomic write a batch of operations.
* @param batch
* @throws IOException
*/
void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
List<byte[]> delList) throws IOException;
/**
* Performs a batch Put to Metadata DB.
* Can be used to do multiple puts atomically.
* @param putList - list of Key/Value to put into DB
*/
void batchPut(List<Map.Entry<byte[], byte[]>> putList) throws IOException;
void writeBatch(BatchOperation batch) throws IOException;
/**
* Given a volume return the corresponding DB key.
@ -120,7 +111,7 @@ public interface MetadataManager {
*
* @param key - key name
*/
void deleteKey(byte[] key);
void deleteKey(byte[] key) throws IOException;
/**
* Given a volume, check if it is empty,

View File

@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
@ -32,12 +32,11 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import java.io.File;
import java.io.IOException;
@ -57,9 +56,9 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
/**
* KSM metadata manager interface.
*/
public class MetadataManagerImpl implements MetadataManager {
public class MetadataManagerImpl implements MetadataManager {
private final LevelDBStore store;
private final MetadataStore store;
private final ReadWriteLock lock;
@ -67,10 +66,12 @@ public class MetadataManagerImpl implements MetadataManager {
File metaDir = OzoneUtils.getScmMetadirPath(conf);
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
Options options = new Options();
options.cacheSize(cacheSize * OzoneConsts.MB);
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
this.store = new LevelDBStore(ksmDBFile, options);
this.store = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(ksmDBFile)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
this.lock = new ReentrantReadWriteLock();
}
@ -153,7 +154,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @param key - key name
*/
@Override
public void deleteKey(byte[] key) {
public void deleteKey(byte[] key) throws IOException {
store.delete(key);
}
@ -181,7 +182,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @return value
*/
@Override
public byte[] get(byte[] key) {
public byte[] get(byte[] key) throws IOException {
return store.get(key);
}
@ -191,7 +192,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @param value - value
*/
@Override
public void put(byte[] key, byte[] value) {
public void put(byte[] key, byte[] value) throws IOException {
store.put(key, value);
}
@ -199,45 +200,13 @@ public class MetadataManagerImpl implements MetadataManager {
* Deletes a Key from Metadata DB.
* @param key - key
*/
public void delete(byte[] key) {
public void delete(byte[] key) throws IOException {
store.delete(key);
}
/**
* Performs a batch Put and Delete from Metadata DB.
* Can be used to do multiple puts and deletes atomically.
* @param putList - list of key and value pairs to put to Metadata DB.
* @param delList - list of keys to delete from Metadata DB.
*/
@Override
public void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
List<byte[]> delList)
throws IOException {
WriteBatch batch = store.createWriteBatch();
putList.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
delList.forEach(entry -> batch.delete(entry));
try {
store.commitWriteBatch(batch);
} finally {
store.closeWriteBatch(batch);
}
}
/**
* Performs a batch Put to Metadata DB.
* Can be used to do multiple puts atomically.
* @param list - list of Map.Entry
*/
@Override
public void batchPut(List<Map.Entry<byte[], byte[]>> list)
throws IOException {
WriteBatch batch = store.createWriteBatch();
list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
try {
store.commitWriteBatch(batch);
} finally {
store.closeWriteBatch(batch);
}
public void writeBatch(BatchOperation batch) throws IOException {
this.store.writeBatch(batch);
}
/**
@ -246,21 +215,17 @@ public class MetadataManagerImpl implements MetadataManager {
* @return true if the volume is empty
*/
public boolean isVolumeEmpty(String volume) throws IOException {
try (DBIterator iterator = store.getIterator()) {
String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+ OzoneConsts.KSM_BUCKET_PREFIX;
byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
// Seek to the root of the volume and look for the next key
iterator.seek(dbVolumeRootKey);
if (iterator.hasNext()) {
String firstBucketKey = DFSUtil.bytes2String(iterator.next().getKey());
// if the key starts with /<volume name>/
// then there is at least one bucket
return !firstBucketKey.startsWith(dbVolumeRootName);
} else {
return true;
}
String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
// Seek to the root of the volume and look for the next key
ImmutablePair<byte[], byte[]> volumeRoot =
store.peekAround(1, dbVolumeRootKey);
if (volumeRoot != null) {
String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
return !firstBucketKey.startsWith(dbVolumeRootName
+ OzoneConsts.KSM_BUCKET_PREFIX);
}
return true;
}
/**
@ -272,18 +237,15 @@ public class MetadataManagerImpl implements MetadataManager {
*/
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
try (DBIterator iterator = store.getIterator()) {
String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket
+ OzoneConsts.KSM_KEY_PREFIX;
byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
iterator.seek(keyRoot);
if(iterator.hasNext()) {
return !DFSUtil.bytes2String(iterator.next().getKey())
.startsWith(keyRootName);
}
return true;
String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket;
byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
if (firstKey != null) {
return !DFSUtil.bytes2String(firstKey.getKey())
.startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
}
return true;
}
/**
@ -305,8 +267,19 @@ public class MetadataManagerImpl implements MetadataManager {
ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
LevelDBKeyFilter filter =
new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix));
// A bucket must start with /volume/bucket_prefix
// and exclude keys /volume/bucket_xxx/key_xxx
MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
if (currentKey != null) {
String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
String bucket = DFSUtil.bytes2String(currentKey);
return bucket.startsWith(bucketNamePrefix) &&
!bucket.replaceFirst(bucketNamePrefix, "")
.contains(OzoneConsts.KSM_KEY_PREFIX);
}
return false;
};
List<Map.Entry<byte[], byte[]>> rangeResult;
if (!Strings.isNullOrEmpty(startBucket)) {
@ -349,7 +322,7 @@ public class MetadataManagerImpl implements MetadataManager {
ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
LevelDBKeyFilter filter =
MetadataKeyFilter filter =
new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
List<Map.Entry<byte[], byte[]>> rangeResult;
@ -427,18 +400,17 @@ public class MetadataManagerImpl implements MetadataManager {
private VolumeList getVolumesByUser(byte[] userNameKey)
throws KSMException {
VolumeList volumes = null;
byte[] volumesInBytes = store.get(userNameKey);
if (volumesInBytes == null) {
// No volume found for this user, return an empty list
return VolumeList.newBuilder().build();
}
try {
byte[] volumesInBytes = store.get(userNameKey);
if (volumesInBytes == null) {
// No volume found for this user, return an empty list
return VolumeList.newBuilder().build();
}
volumes = VolumeList.parseFrom(volumesInBytes);
} catch (InvalidProtocolBufferException e) {
} catch (IOException e) {
throw new KSMException("Unable to get volumes info by the given user, "
+ "metadata might be corrupted",
e, ResultCodes.FAILED_METADATA_ERROR);
+ "metadata might be corrupted", e,
ResultCodes.FAILED_METADATA_ERROR);
}
return volumes;
}

View File

@ -26,15 +26,13 @@ import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.iq80.leveldb.DBException;
import org.apache.hadoop.utils.BatchOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
@ -67,8 +65,7 @@ public class VolumeManagerImpl implements VolumeManager {
// Helpers to add and delete volume from user list
private void addVolumeToOwnerList(String volume, String owner,
List<Map.Entry<byte[], byte[]>> putBatch)
throws IOException {
BatchOperation batchOperation) throws IOException {
// Get the volume list
byte[] dbUserKey = metadataManager.getUserKey(owner);
byte[] volumeList = metadataManager.get(dbUserKey);
@ -88,12 +85,11 @@ public class VolumeManagerImpl implements VolumeManager {
prevVolList.add(volume);
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
batchOperation.put(dbUserKey, newVolList.toByteArray());
}
private void delVolumeFromOwnerList(String volume, String owner,
List<Map.Entry<byte[], byte[]>> putBatch,
List<byte[]> deleteBatch)
BatchOperation batchOperation)
throws IOException {
// Get the volume list
byte[] dbUserKey = metadataManager.getUserKey(owner);
@ -109,18 +105,14 @@ public class VolumeManagerImpl implements VolumeManager {
// Remove the volume from the list
prevVolList.remove(volume);
if (prevVolList.size() == 0) {
deleteBatch.add(dbUserKey);
batchOperation.delete(dbUserKey);
} else {
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
batchOperation.put(dbUserKey, newVolList.toByteArray());
}
}
private Map.Entry<byte[], byte[]> batchEntry(byte[] key, byte[] value) {
return new AbstractMap.SimpleEntry<>(key, value);
}
/**
* Creates a volume.
* @param args - KsmVolumeArgs.
@ -129,7 +121,6 @@ public class VolumeManagerImpl implements VolumeManager {
public void createVolume(KsmVolumeArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
byte[] volumeInfo = metadataManager.get(dbVolumeKey);
@ -140,16 +131,17 @@ public class VolumeManagerImpl implements VolumeManager {
throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
}
BatchOperation batch = new BatchOperation();
// Write the vol info
VolumeInfo newVolumeInfo = args.getProtobuf();
batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
// Add volume to user list
addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
metadataManager.batchPut(batch);
metadataManager.writeBatch(batch);
LOG.info("created volume:{} user:{}",
args.getVolume(), args.getOwnerName());
} catch (IOException | DBException ex) {
} catch (IOException ex) {
LOG.error("Volume creation failed for user:{} volname:{}",
args.getOwnerName(), args.getVolume(), ex);
throw ex;
@ -169,8 +161,6 @@ public class VolumeManagerImpl implements VolumeManager {
public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner);
List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
List<byte[]> deleteBatch = new LinkedList<>();
metadataManager.writeLock().lock();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@ -183,9 +173,9 @@ public class VolumeManagerImpl implements VolumeManager {
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
putBatch, deleteBatch);
addVolumeToOwnerList(volume, owner, putBatch);
BatchOperation batch = new BatchOperation();
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
addVolumeToOwnerList(volume, owner, batch);
KsmVolumeArgs newVolumeArgs =
KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@ -195,9 +185,9 @@ public class VolumeManagerImpl implements VolumeManager {
.build();
VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
putBatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
metadataManager.batchPutDelete(putBatch, deleteBatch);
metadataManager.writeBatch(batch);
} catch (IOException ex) {
LOG.error("Changing volume ownership failed for user:{} volume:{}",
owner, volume, ex);
@ -285,8 +275,7 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkNotNull(volume);
metadataManager.writeLock().lock();
try {
List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
List<byte[]> deleteBatch = new LinkedList<>();
BatchOperation batch = new BatchOperation();
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.get(dbVolumeKey);
if (volInfo == null) {
@ -301,10 +290,9 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
// delete the volume from the owner list
// as well as delete the volume entry
delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
putBatch, deleteBatch);
deleteBatch.add(dbVolumeKey);
metadataManager.batchPutDelete(putBatch, deleteBatch);
delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
batch.delete(dbVolumeKey);
metadataManager.writeBatch(batch);
} catch (IOException ex) {
LOG.error("Delete volume failed for volume:{}", volume, ex);
throw ex;

View File

@ -29,10 +29,9 @@ import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,13 +74,13 @@ public class BlockManagerImpl implements BlockManager {
private final NodeManager nodeManager;
private final Mapping containerManager;
private final LevelDBStore blockStore;
private final MetadataStore blockStore;
private final Lock lock;
private final long containerSize;
private final long cacheSize;
private final LevelDBStore openContainerStore;
private final MetadataStore openContainerStore;
private Map<String, Long> openContainers;
private final int containerProvisionBatchSize;
private final Random rand;
@ -102,12 +101,14 @@ public class BlockManagerImpl implements BlockManager {
this.cacheSize = cacheSizeMB;
File metaDir = OzoneUtils.getScmMetadirPath(conf);
String scmMetaDataDir = metaDir.getPath();
Options options = new Options();
options.cacheSize(this.cacheSize * OzoneConsts.MB);
// Write the block key to container name mapping.
File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
blockStore = new LevelDBStore(blockContainerDbPath, options);
blockStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(blockContainerDbPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
@ -115,7 +116,12 @@ public class BlockManagerImpl implements BlockManager {
// Load store of all open contains for block allocation
File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
openContainerStore = new LevelDBStore(openContainsDbPath, options);
openContainerStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(openContainsDbPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
openContainers = new HashMap<>();
loadOpenContainers();
@ -132,20 +138,19 @@ public class BlockManagerImpl implements BlockManager {
* @throws IOException
*/
private void loadOpenContainers() throws IOException {
try (DBIterator iter = openContainerStore.getIterator()) {
for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
try {
openContainerStore.iterate(null, (key, value) -> {
try {
byte[] key = iter.peekNext().getKey();
String containerName = DFSUtil.bytes2String(key);
byte[] value = iter.peekNext().getValue();
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
openContainers.put(containerName, containerUsed);
LOG.debug("Loading open container: {} used : {}", containerName,
containerUsed);
} catch (Exception ex) {
} catch (Exception e) {
LOG.warn("Failed loading open container, continue next...");
}
}
return true;
});
} catch (IOException e) {
LOG.error("Loading open container store failed." + e);
throw new SCMException("Failed to load open container store",
@ -321,21 +326,19 @@ public class BlockManagerImpl implements BlockManager {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
try (WriteBatch wb = blockStore.createWriteBatch()) {
containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
// Delete the block key
wb.delete(DFSUtil.string2Bytes(key));
blockStore.commitWriteBatch(wb);
// TODO: Add async tombstone clean thread to send delete command to
// datanodes in the pipeline to clean up the blocks from containers.
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
}
BatchOperation batch = new BatchOperation();
containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
// Delete the block key
batch.delete(DFSUtil.string2Bytes(key));
blockStore.writeBatch(batch);
// TODO: Add async tombstone clean thread to send delete command to
// datanodes in the pipeline to clean up the blocks from containers.
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
} finally {
lock.unlock();
}
@ -359,4 +362,4 @@ public class BlockManagerImpl implements BlockManager {
openContainerStore.close();
}
}
}
}

View File

@ -31,8 +31,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,7 +47,6 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
@ -153,6 +152,7 @@ public class SQLCLI extends Configured implements Tool {
.withDescription("specify output path")
.create("o");
allOptions.addOption(outPathOption);
return allOptions;
}
@ -254,22 +254,25 @@ public class SQLCLI extends Configured implements Tool {
throws Exception {
LOG.info("Create tables for sql container db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString());
DBIterator iter = dbStore.getIterator()) {
try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
.setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_CONTAINER_INFO);
executeSQL(conn, CREATE_CONTAINER_MEMBERS);
executeSQL(conn, CREATE_DATANODE_INFO);
iter.seekToFirst();
HashSet<String> uuidChecked = new HashSet<>();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String containerName = new String(entry.getKey(), encoding);
Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
insertContainerDB(conn, containerName, pipeline, uuidChecked);
}
dbStore.iterate(null, (key, value) -> {
String containerName = new String(key, encoding);
Pipeline pipeline = null;
pipeline = Pipeline.parseFrom(value);
try {
insertContainerDB(conn, containerName, pipeline, uuidChecked);
return true;
} catch (SQLException e) {
throw new IOException(e);
}
});
}
}
@ -330,21 +333,24 @@ public class SQLCLI extends Configured implements Tool {
private void convertBlockDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create tables for sql block db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString());
DBIterator iter = dbStore.getIterator()) {
try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
.setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_BLOCK_CONTAINER);
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String blockKey = DFSUtilClient.bytes2String(entry.getKey());
String containerName = DFSUtilClient.bytes2String(entry.getValue());
dbStore.iterate(null, (key, value) -> {
String blockKey = DFSUtilClient.bytes2String(key);
String containerName = DFSUtilClient.bytes2String(value);
String insertBlockContainer = String.format(
INSERT_BLOCK_CONTAINER, blockKey, containerName);
executeSQL(conn, insertBlockContainer);
}
try {
executeSQL(conn, insertBlockContainer);
return true;
} catch (SQLException e) {
throw new IOException(e);
}
});
}
}
@ -374,21 +380,23 @@ public class SQLCLI extends Configured implements Tool {
private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create table for sql node pool db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString());
DBIterator iter = dbStore.getIterator()) {
try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
.setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_NODE_POOL);
executeSQL(conn, CREATE_DATANODE_INFO);
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
DatanodeID nodeId = DatanodeID.getFromProtoBuf(
HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey()));
String blockPool = DFSUtil.bytes2String(entry.getValue());
insertNodePoolDB(conn, blockPool, nodeId);
}
dbStore.iterate(null, (key, value) -> {
DatanodeID nodeId = DatanodeID
.getFromProtoBuf(HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
String blockPool = DFSUtil.bytes2String(value);
try {
insertNodePoolDB(conn, blockPool, nodeId);
return true;
} catch (SQLException e) {
throw new IOException(e);
}
});
}
}
@ -423,22 +431,24 @@ public class SQLCLI extends Configured implements Tool {
throws Exception {
LOG.info("Create table for open container db.");
File dbFile = dbPath.toFile();
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
Connection conn = connectDB(outPath.toString());
DBIterator iter = dbStore.getIterator()) {
try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
.setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_OPEN_CONTAINER);
iter.seekToFirst();
while (iter.hasNext()) {
Map.Entry<byte[], byte[]> entry = iter.next();
String containerName = DFSUtil.bytes2String(entry.getKey());
Long containerUsed = Long.parseLong(
DFSUtil.bytes2String(entry.getValue()));
String insertOpenContainer = String.format(
INSERT_OPEN_CONTAINER, containerName, containerUsed);
executeSQL(conn, insertOpenContainer);
}
dbStore.iterate(null, (key, value) -> {
String containerName = DFSUtil.bytes2String(key);
Long containerUsed =
Long.parseLong(DFSUtil.bytes2String(value));
String insertOpenContainer = String
.format(INSERT_OPEN_CONTAINER, containerName, containerUsed);
try {
executeSQL(conn, insertOpenContainer);
return true;
} catch (SQLException e) {
throw new IOException(e);
}
});
}
}

View File

@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.Options;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +64,7 @@ public class ContainerMapping implements Mapping {
private final long cacheSize;
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
private final LevelDBStore containerStore;
private final MetadataStore containerStore;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
@ -85,12 +86,14 @@ public class ContainerMapping implements Mapping {
this.cacheSize = cacheSizeMB;
File metaDir = OzoneUtils.getScmMetadirPath(conf);
Options options = new Options();
options.cacheSize(this.cacheSize * OzoneConsts.MB);
// Write the container name to pipeline mapping.
File containerDBPath = new File(metaDir, CONTAINER_DB);
containerStore = new LevelDBStore(containerDBPath, options);
containerStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(containerDBPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
this.lock = new ReentrantLock();
@ -192,7 +195,7 @@ public class ContainerMapping implements Mapping {
if(containerStore.isEmpty()) {
throw new IOException("No container exists in current db");
}
KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefixName);
MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
byte[] startKey = startName == null ?
null : DFSUtil.string2Bytes(startName);
List<Map.Entry<byte[], byte[]>> range =

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
@ -260,8 +261,17 @@ public class ContainerReplicationManager implements Closeable {
* a datanode.
*/
public void handleContainerReport(ContainerReportsProto containerReport) {
String poolName = poolManager.getNodePool(
DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
String poolName = null;
DatanodeID datanodeID = DatanodeID
.getFromProtoBuf(containerReport.getDatanodeID());
try {
poolName = poolManager.getNodePool(datanodeID);
} catch (SCMException e) {
LOG.warn("Skipping processing container report from datanode {}, "
+ "cause: failed to get the corresponding node pool",
datanodeID.toString(), e);
return;
}
for(InProgressPool ppool : inProgressPoolList) {
if(ppool.getPoolName().equalsIgnoreCase(poolName)) {

View File

@ -110,6 +110,7 @@ public class SCMException extends IOException {
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
BLOCK_EXISTS,
FAILED_TO_FIND_BLOCK
FAILED_TO_FIND_BLOCK,
IO_EXCEPTION
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
@ -35,7 +36,7 @@ public interface NodePoolManager extends Closeable {
* @param pool - name of the node pool.
* @param node - data node.
*/
void addNode(String pool, DatanodeID node);
void addNode(String pool, DatanodeID node) throws IOException;
/**
* Remove a node from a node pool.
@ -67,5 +68,5 @@ public interface NodePoolManager extends Closeable {
* @return node pool name if it has been assigned.
* null if the node has not been assigned to any node pool yet.
*/
String getNodePool(DatanodeID datanodeID);
String getNodePool(DatanodeID datanodeID) throws SCMException;
}

View File

@ -726,8 +726,16 @@ public class SCMNodeManager
// TODO: define node pool policy for non-default node pool.
// For now, all nodes are added to the "DefaultNodePool" upon registration
// if it has not been added to any node pool yet.
if (nodePoolManager.getNodePool(datanodeID) == null) {
nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, datanodeID);
try {
if (nodePoolManager.getNodePool(datanodeID) == null) {
nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
datanodeID);
}
} catch (IOException e) {
// TODO: make sure registration failure is handled correctly.
return RegisteredCommand.newBuilder()
.setErrorCode(ErrorCode.errorNodeNotPermitted)
.build();
}
LOG.info("Data node with ID: {} Registered.",
datanodeID.getDatanodeUuid());
@ -823,4 +831,4 @@ public class SCMNodeManager
}
return nodeCountMap;
}
}
}

View File

@ -26,9 +26,8 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,7 +64,7 @@ public final class SCMNodePoolManager implements NodePoolManager {
public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
// DB that saves the node to node pool mapping.
private LevelDBStore nodePoolStore;
private MetadataStore nodePoolStore;
// In-memory node pool to nodes mapping
private HashMap<String, Set<DatanodeID>> nodePools;
@ -84,11 +83,12 @@ public final class SCMNodePoolManager implements NodePoolManager {
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
String scmMetaDataDir = metaDir.getPath();
Options options = new Options();
options.cacheSize(cacheSize * OzoneConsts.MB);
File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
nodePoolStore = new LevelDBStore(nodePoolDBPath, options);
nodePoolStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(nodePoolDBPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
nodePools = new HashMap<>();
lock = new ReentrantReadWriteLock();
init();
@ -100,14 +100,11 @@ public final class SCMNodePoolManager implements NodePoolManager {
* @throws SCMException
*/
private void init() throws SCMException {
try (DBIterator iter = nodePoolStore.getIterator()) {
for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
try {
nodePoolStore.iterate(null, (key, value) -> {
try {
byte[] key = iter.peekNext().getKey();
DatanodeID nodeId = DatanodeID.getFromProtoBuf(
HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
byte[] value = iter.peekNext().getValue();
String poolName = DFSUtil.bytes2String(value);
Set<DatanodeID> nodePool = null;
@ -119,12 +116,14 @@ public final class SCMNodePoolManager implements NodePoolManager {
}
nodePool.add(nodeId);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding node: {} to node pool: {}", nodeId, poolName);
LOG.debug("Adding node: {} to node pool: {}",
nodeId, poolName);
}
} catch (Exception ex) {
} catch (IOException e) {
LOG.warn("Can't add a datanode to node pool, continue next...");
}
}
return true;
});
} catch (IOException e) {
LOG.error("Loading node pool error " + e);
throw new SCMException("Failed to load node pool",
@ -138,7 +137,8 @@ public final class SCMNodePoolManager implements NodePoolManager {
* @param node - name of the datanode.
*/
@Override
public void addNode(final String pool, final DatanodeID node) {
public void addNode(final String pool, final DatanodeID node)
throws IOException {
Preconditions.checkNotNull(pool, "pool name is null");
Preconditions.checkNotNull(node, "node is null");
lock.writeLock().lock();
@ -192,6 +192,10 @@ public final class SCMNodePoolManager implements NodePoolManager {
throw new SCMException(String.format("Unable to find node %s from" +
" pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
FAILED_TO_FIND_NODE_IN_POOL); }
} catch (IOException e) {
throw new SCMException("Failed to remove node " + node.toString()
+ " from node pool " + pool, e,
SCMException.ResultCodes.IO_EXCEPTION);
} finally {
lock.writeLock().unlock();
}
@ -238,14 +242,17 @@ public final class SCMNodePoolManager implements NodePoolManager {
* TODO: Put this in a in-memory map if performance is an issue.
*/
@Override
public String getNodePool(final DatanodeID datanodeID) {
public String getNodePool(final DatanodeID datanodeID) throws SCMException {
Preconditions.checkNotNull(datanodeID, "node is null");
byte[] result = nodePoolStore.get(
datanodeID.getProtoBufMessage().toByteArray());
if (result == null) {
return null;
try {
byte[] result = nodePoolStore.get(
datanodeID.getProtoBufMessage().toByteArray());
return result == null ? null : DFSUtil.bytes2String(result);
} catch (IOException e) {
throw new SCMException("Failed to get node pool for node "
+ datanodeID.toString(), e,
SCMException.ResultCodes.IO_EXCEPTION);
}
return DFSUtil.bytes2String(result);
}
/**

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
@ -39,8 +38,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -128,8 +127,8 @@ public final class OzoneMetadataManager {
private static final String USER_DB = "/user.db";
private static final String META_DB = "/metadata.db";
private static OzoneMetadataManager bm = null;
private LevelDBStore userDB;
private LevelDBStore metadataDB;
private MetadataStore userDB;
private MetadataStore metadataDB;
private ReadWriteLock lock;
private Charset encoding = Charset.forName("UTF-8");
private String storageRoot;
@ -157,8 +156,14 @@ public final class OzoneMetadataManager {
}
try {
userDB = new LevelDBStore(new File(storageRoot + USER_DB), true);
metadataDB = new LevelDBStore(new File(storageRoot + META_DB), true);
userDB = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(storageRoot + USER_DB))
.setCreateIfMissing(true)
.build();
metadataDB = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(storageRoot + META_DB))
.setCreateIfMissing(true)
.build();
inProgressObjects = new ConcurrentHashMap<>();
} catch (IOException ex) {
LOG.error("Cannot open db :" + ex.getMessage());
@ -230,7 +235,7 @@ public final class OzoneMetadataManager {
metadataDB.put(args.getVolumeName().getBytes(encoding),
newVInfo.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@ -295,7 +300,7 @@ public final class OzoneMetadataManager {
userDB.put(args.getResourceName().getBytes(encoding),
volumeList.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@ -341,7 +346,7 @@ public final class OzoneMetadataManager {
VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding));
return info.getOwner().getName().equals(acl.getName());
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex);
} finally {
lock.readLock().unlock();
@ -365,7 +370,7 @@ public final class OzoneMetadataManager {
}
return VolumeInfo.parse(new String(volumeInfo, encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.readLock().unlock();
@ -405,7 +410,7 @@ public final class OzoneMetadataManager {
prevKey = volName[1];
}
return getFilteredVolumes(volumeList, prefix, prevKey, maxCount);
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex);
} finally {
lock.readLock().unlock();
@ -448,80 +453,54 @@ public final class OzoneMetadataManager {
* @return ListVolumes.
* @throws OzoneException
*/
public ListVolumes listAllVolumes(ListArgs args) throws OzoneException,
IOException {
public ListVolumes listAllVolumes(ListArgs args)
throws OzoneException, IOException {
String prefix = args.getPrefix();
String prevKey = args.getPrevKey();
final String prevKey;
int maxCount = args.getMaxKeys();
String userName = null;
try (DBIterator iterator = this.userDB.getDB().iterator()) {
if (prevKey != null) {
// Format is username/volumeName
String[] volName = args.getPrevKey().split("/");
if (volName.length < 2) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
seekToUser(iterator, volName[0]);
userName = new String(iterator.peekNext().getKey(), encoding);
prevKey = volName[1];
} else {
userName = getFirstUser(iterator);
}
if (userName == null || userName.isEmpty()) {
if (args.getPrevKey() != null) {
// Format is username/volumeName
String[] volName = args.getPrevKey().split("/");
if (volName.length < 2) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
ListVolumes returnSet = new ListVolumes();
int count = maxCount - returnSet.getVolumes().size();
byte[] userNameBytes = userDB.get(volName[0].getBytes(encoding));
userName = new String(userNameBytes, encoding);
prevKey = volName[1];
} else {
userName = new String(userDB.peekAround(0, null).getKey(), encoding);
prevKey = null;
}
// we need to iterate through users until we get maxcount volumes
// or no more volumes are left.
while (iterator.hasNext() && count > 0) {
if (userName == null || userName.isEmpty()) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
userName = new String(iterator.next().getKey(), encoding);
byte[] volumeList = userDB.get(userName.getBytes(encoding));
ListVolumes returnSet = new ListVolumes();
// we need to iterate through users until we get maxcount volumes
// or no more volumes are left.
userDB.iterate(null, (key, value) -> {
int currentSize = returnSet.getVolumes().size();
if (currentSize < maxCount) {
String name = new String(key, encoding);
byte[] volumeList = userDB.get(name.getBytes(encoding));
if (volumeList == null) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
throw new IOException(
ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()));
}
returnSet.getVolumes().addAll(getFilteredVolumes(
volumeList, prefix, prevKey, count).getVolumes());
count = maxCount - returnSet.getVolumes().size();
returnSet.getVolumes().addAll(
getFilteredVolumes(volumeList, prefix, prevKey,
maxCount - currentSize).getVolumes());
return true;
} else {
return false;
}
return returnSet;
}
}
});
/**
* Returns the first user name from the UserDB.
*
* @return - UserName.
* @throws IOException
*/
String getFirstUser(DBIterator iterator) throws IOException {
iterator.seekToFirst();
if (iterator.hasNext()) {
return new String(iterator.peekNext().getKey(), encoding);
}
return null;
}
/**
* Reposition the DB cursor to the user name.
*
* @param iterator - Current Iterator.
* @param userName - userName to seek to
* @return - DBIterator.
* @throws IOException
*/
DBIterator seekToUser(DBIterator iterator, String userName) throws
IOException {
iterator.seek(userName.getBytes(encoding));
return iterator;
return returnSet;
}
/**
@ -587,7 +566,7 @@ public final class OzoneMetadataManager {
metadataDB.delete(args.getVolumeName().getBytes(encoding));
userDB.put(user.getBytes(encoding),
volumeList.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@ -659,7 +638,7 @@ public final class OzoneMetadataManager {
metadataDB.put(args.getResourceName().getBytes(encoding),
bucketInfo.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@ -716,7 +695,7 @@ public final class OzoneMetadataManager {
userDB.put(args.getParentName().getBytes(encoding),
bucketList.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@ -807,7 +786,7 @@ public final class OzoneMetadataManager {
metadataDB.delete(args.getResourceName().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
bucketList.toDBString().getBytes(encoding));
} catch (IOException | DBException ex) {
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;
import com.google.common.collect.Lists;
import java.util.List;
/**
* An utility class to store a batch of DB write operations.
*/
public class BatchOperation {
/**
* Enum for write operations.
*/
public enum Operation {
DELETE, PUT
}
private List<SingleOperation> operations =
Lists.newArrayList();
/**
* Add a PUT operation into the batch.
*/
public void put(byte[] key, byte[] value) {
operations.add(new SingleOperation(Operation.PUT, key, value));
}
/**
* Add a DELETE operation into the batch.
*/
public void delete(byte[] key) {
operations.add(new SingleOperation(Operation.DELETE, key, null));
}
public List<SingleOperation> getOperations() {
return operations;
}
/**
* A SingleOperation represents a PUT or DELETE operation
* and the data the operation needs to manipulates.
*/
public static class SingleOperation {
private Operation opt;
private byte[] key;
private byte[] value;
public SingleOperation(Operation opt, byte[] key, byte[] value) {
this.opt = opt;
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
}
this.key = key.clone();
this.value = value == null ? null : value.clone();
}
public Operation getOpt() {
return opt;
}
public byte[] getKey() {
return key.clone();
}
public byte[] getValue() {
return value == null ? null : value.clone();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;
import java.io.IOException;
/**
* A consumer for metadata store key-value entries.
* Used by {@link MetadataStore} class.
*/
@FunctionalInterface
public interface EntryConsumer {
/**
* Consumes a key and value and produces a boolean result.
* @param key key
* @param value value
* @return a boolean value produced by the consumer
* @throws IOException
*/
boolean consume(byte[] key, byte[] value) throws IOException;
}

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.utils;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.DB;
@ -30,7 +31,6 @@ import org.iq80.leveldb.ReadOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -41,7 +41,7 @@ import java.util.Map.Entry;
/**
* LevelDB interface.
*/
public class LevelDBStore implements Closeable {
public class LevelDBStore implements MetadataStore {
private static final Logger LOG =
LoggerFactory.getLogger(LevelDBStore.class);
@ -51,23 +51,13 @@ public class LevelDBStore implements Closeable {
private final Options dbOptions;
private final WriteOptions writeOptions;
/**
* Opens a DB file.
*
* @param dbPath - DB File path
* @param createIfMissing - Create if missing
* @throws IOException
*/
public LevelDBStore(File dbPath, boolean createIfMissing) throws
IOException {
public LevelDBStore(File dbPath, boolean createIfMissing)
throws IOException {
dbOptions = new Options();
dbOptions.createIfMissing(createIfMissing);
db = JniDBFactory.factory.open(dbPath, dbOptions);
if (db == null) {
throw new IOException("Db is null");
}
this.dbFile = dbPath;
this.writeOptions = new WriteOptions().sync(true);
openDB(dbPath, dbOptions);
}
/**
@ -79,14 +69,23 @@ public class LevelDBStore implements Closeable {
public LevelDBStore(File dbPath, Options options)
throws IOException {
dbOptions = options;
db = JniDBFactory.factory.open(dbPath, options);
if (db == null) {
throw new IOException("Db is null");
}
this.dbFile = dbPath;
this.writeOptions = new WriteOptions().sync(true);
openDB(dbPath, dbOptions);
}
private void openDB(File dbPath, Options options) throws IOException {
db = JniDBFactory.factory.open(dbPath, options);
if (LOG.isDebugEnabled()) {
LOG.debug("LevelDB successfully opened");
LOG.debug("[Option] cacheSize = " + options.cacheSize());
LOG.debug("[Option] createIfMissing = " + options.createIfMissing());
LOG.debug("[Option] blockSize = " + options.blockSize());
LOG.debug("[Option] compressionType= " + options.compressionType());
LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles());
LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize());
}
}
/**
* Puts a Key into file.
@ -94,6 +93,7 @@ public class LevelDBStore implements Closeable {
* @param key - key
* @param value - value
*/
@Override
public void put(byte[] key, byte[] value) {
db.put(key, value, writeOptions);
}
@ -104,6 +104,7 @@ public class LevelDBStore implements Closeable {
* @param key key
* @return value
*/
@Override
public byte[] get(byte[] key) {
return db.get(key);
}
@ -113,6 +114,7 @@ public class LevelDBStore implements Closeable {
*
* @param key - Key
*/
@Override
public void delete(byte[] key) {
db.delete(key);
}
@ -133,24 +135,15 @@ public class LevelDBStore implements Closeable {
* @return boolean
* @throws IOException
*/
@Override
public boolean isEmpty() throws IOException {
DBIterator iter = db.iterator();
try {
try (DBIterator iter = db.iterator()) {
iter.seekToFirst();
return !iter.hasNext();
} finally {
iter.close();
boolean hasNext = !iter.hasNext();
return hasNext;
}
}
/**
* Returns Java File Object that points to the DB.
* @return File
*/
public File getDbFile() {
return dbFile;
}
/**
* Returns the actual levelDB object.
* @return DB handle.
@ -168,39 +161,71 @@ public class LevelDBStore implements Closeable {
}
@Override
public void destroy() throws IOException {
JniDBFactory.factory.destroy(dbFile, dbOptions);
}
/**
* Returns a write batch for write multiple key-value pairs atomically.
* @return write batch that can be commit atomically.
*/
public WriteBatch createWriteBatch() {
return db.createWriteBatch();
@Override
public ImmutablePair<byte[], byte[]> peekAround(int offset,
byte[] from) throws IOException, IllegalArgumentException {
try (DBIterator it = db.iterator()) {
if (from == null) {
it.seekToFirst();
} else {
it.seek(from);
}
if (!it.hasNext()) {
throw new IOException("Key not found");
}
switch (offset) {
case 0:
Entry<byte[], byte[]> current = it.next();
return new ImmutablePair<>(current.getKey(), current.getValue());
case 1:
if (it.next() != null && it.hasNext()) {
Entry<byte[], byte[]> next = it.peekNext();
return new ImmutablePair<>(next.getKey(), next.getValue());
}
break;
case -1:
if (it.hasPrev()) {
Entry<byte[], byte[]> prev = it.peekPrev();
return new ImmutablePair<>(prev.getKey(), prev.getValue());
}
break;
default:
throw new IllegalArgumentException(
"Position can only be -1, 0 " + "or 1, but found " + offset);
}
}
return null;
}
/**
* Commit multiple writes of key-value pairs atomically.
* @param wb
*/
public void commitWriteBatch(WriteBatch wb) {
db.write(wb, writeOptions);
}
/**
* Close a write batch of multiple writes to key-value pairs.
* @param wb - write batch.
* @throws IOException
*/
public void closeWriteBatch(WriteBatch wb) throws IOException {
wb.close();
@Override
public void iterate(byte[] from, EntryConsumer consumer)
throws IOException {
try (DBIterator iter = db.iterator()) {
if (from != null) {
iter.seek(from);
} else {
iter.seekToFirst();
}
while (iter.hasNext()) {
Entry<byte[], byte[]> current = iter.next();
if (!consumer.consume(current.getKey(),
current.getValue())) {
break;
}
}
}
}
/**
* Compacts the DB by removing deleted keys etc.
* @throws IOException if there is an error.
*/
@Override
public void compactDB() throws IOException {
if(db != null) {
// From LevelDB docs : begin == null and end == null means the whole DB.
@ -208,26 +233,33 @@ public class LevelDBStore implements Closeable {
}
}
/**
* Returns a certain range of key value pairs as a list based on a startKey
* or count.
*
* @param keyPrefix start key.
* @param count number of entries to return.
* @return a range of entries or an empty list if nothing found.
* @throws IOException
*
* @see #getRangeKVs(byte[], int, LevelDBKeyFilter...)
*/
public List<Entry<byte[], byte[]>> getRangeKVs(byte[] keyPrefix, int count)
throws IOException {
LevelDBKeyFilter emptyFilter = (preKey, currentKey, nextKey) -> true;
return getRangeKVs(keyPrefix, count, emptyFilter);
@Override
public void writeBatch(BatchOperation operation) throws IOException {
List<BatchOperation.SingleOperation> operations =
operation.getOperations();
if (!operations.isEmpty()) {
try (WriteBatch writeBatch = db.createWriteBatch()) {
for (BatchOperation.SingleOperation opt : operations) {
switch (opt.getOpt()) {
case DELETE:
writeBatch.delete(opt.getKey());
break;
case PUT:
writeBatch.put(opt.getKey(), opt.getValue());
break;
default:
throw new IllegalArgumentException("Invalid operation "
+ opt.getOpt());
}
}
db.write(writeBatch);
}
}
}
/**
* Returns a certain range of key value pairs as a list based on a
* startKey or count. Further a {@link LevelDBKeyFilter} can be added to
* startKey or count. Further a {@link MetadataKeyFilter} can be added to
* filter keys if necessary. To prevent race conditions while listing
* entries, this implementation takes a snapshot and lists the entries from
* the snapshot. This may, on the other hand, cause the range result slight
@ -241,19 +273,20 @@ public class LevelDBStore implements Closeable {
* The count argument is to limit number of total entries to return,
* the value for count must be an integer greater than 0.
* <p>
* This method allows to specify one or more {@link LevelDBKeyFilter}
* This method allows to specify one or more {@link MetadataKeyFilter}
* to filter keys by certain condition. Once given, only the entries
* whose key passes all the filters will be included in the result.
*
* @param startKey a start key.
* @param count max number of entries to return.
* @param filters customized one or more {@link LevelDBKeyFilter}.
* @param filters customized one or more {@link MetadataKeyFilter}.
* @return a list of entries found in the database.
* @throws IOException if an invalid startKey is given or other I/O errors.
* @throws IllegalArgumentException if count is less than 0.
*/
@Override
public List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
int count, LevelDBKeyFilter... filters) throws IOException {
int count, MetadataKeyFilter... filters) throws IOException {
List<Entry<byte[], byte[]>> result = new ArrayList<>();
long start = System.currentTimeMillis();
if (count < 0) {
@ -295,8 +328,7 @@ public class LevelDBStore implements Closeable {
long timeConsumed = end - start;
if (LOG.isDebugEnabled()) {
LOG.debug("Time consumed for getRangeKVs() is {},"
+ " result length is {}.",
timeConsumed, result.size());
+ " result length is {}.", timeConsumed, result.size());
}
}
return result;

View File

@ -23,12 +23,12 @@ import org.apache.hadoop.hdfs.DFSUtil;
/**
* An utility class to filter levelDB keys.
*/
public class LevelDBKeyFilters {
public class MetadataKeyFilters {
/**
* Interface for levelDB key filters.
*/
public interface LevelDBKeyFilter {
public interface MetadataKeyFilter {
/**
* Filter levelDB key with a certain condition.
*
@ -44,7 +44,7 @@ public class LevelDBKeyFilters {
* Utility class to filter key by a string prefix. This filter
* assumes keys can be parsed to a string.
*/
public static class KeyPrefixFilter implements LevelDBKeyFilter {
public static class KeyPrefixFilter implements MetadataKeyFilter {
private String keyPrefix = null;

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Interface for key-value store that stores ozone metadata.
* Ozone metadata is stored as key value pairs, both key and value
* are arbitrary byte arrays.
*/
@InterfaceStability.Evolving
public interface MetadataStore extends Closeable{
/**
* Puts a key-value pair into the store.
*
* @param key metadata key
* @param value metadata value
*/
void put(byte[] key, byte[] value) throws IOException;
/**
* @return true if the metadata store is empty.
*
* @throws IOException
*/
boolean isEmpty() throws IOException;
/**
* Returns the value mapped to the given key in byte array.
*
* @param key metadata key
* @return value in byte array
* @throws IOException
*/
byte[] get(byte[] key) throws IOException;
/**
* Deletes a key from the metadata store.
*
* @param key metadata key
* @throws IOException
*/
void delete(byte[] key) throws IOException;
/**
* Returns a certain range of key value pairs as a list based on a
* startKey or count. Further a {@link MetadataKeyFilter} can be added to
* filter keys if necessary. To prevent race conditions while listing
* entries, this implementation takes a snapshot and lists the entries from
* the snapshot. This may, on the other hand, cause the range result slight
* different with actual data if data is updating concurrently.
* <p>
* If the startKey is specified and found in levelDB, this key and the keys
* after this key will be included in the result. If the startKey is null
* all entries will be included as long as other conditions are satisfied.
* If the given startKey doesn't exist, an IOException will be thrown.
* <p>
* The count argument is to limit number of total entries to return,
* the value for count must be an integer greater than 0.
* <p>
* This method allows to specify one or more {@link MetadataKeyFilter}
* to filter keys by certain condition. Once given, only the entries
* whose key passes all the filters will be included in the result.
*
* @param startKey a start key.
* @param count max number of entries to return.
* @param filters customized one or more {@link MetadataKeyFilter}.
* @return a list of entries found in the database.
* @throws IOException if an invalid startKey is given or other I/O errors.
* @throws IllegalArgumentException if count is less than 0.
*/
List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
int count, MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException;
/**
* A batch of PUT, DELETE operations handled as a single atomic write.
*
* @throws IOException write fails
*/
void writeBatch(BatchOperation operation) throws IOException;
/**
* Compact the entire database.
* @throws IOException
*/
void compactDB() throws IOException;
/**
* Destroy the content of the specified database,
* a destroyed database will not be able to load again.
* Be very careful with this method.
*
* @throws IOException if I/O error happens
*/
void destroy() throws IOException;
/**
* Seek the database to a certain key, returns the key-value
* pairs around this key based on the given offset. Note, this method
* can only support offset -1 (left), 0 (current) and 1 (right),
* any other offset given will cause a {@link IllegalArgumentException}.
*
* @param offset offset to the key
* @param from from which key
* @return a key-value pair
* @throws IOException
*/
ImmutablePair<byte[], byte[]> peekAround(int offset, byte[] from)
throws IOException, IllegalArgumentException;
/**
* Iterates entries in the database from a certain key.
* Applies the given {@link EntryConsumer} to the key and value of
* each entry, the function produces a boolean result which is used
* as the criteria to exit from iteration.
*
* @param from the start key
* @param consumer
* a {@link EntryConsumer} applied to each key and value. If the consumer
* returns true, continues the iteration to next entry; otherwise exits
* the iteration.
* @throws IOException
*/
void iterate(byte[] from, EntryConsumer consumer)
throws IOException;
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.iq80.leveldb.Options;
import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
/**
* Builder for metadata store.
*/
public class MetadataStoreBuilder {
private File dbFile;
private long cacheSize;
private boolean createIfMissing = true;
private Configuration conf;
public static MetadataStoreBuilder newBuilder() {
return new MetadataStoreBuilder();
}
public MetadataStoreBuilder setDbFile(File dbPath) {
this.dbFile = dbPath;
return this;
}
public MetadataStoreBuilder setCacheSize(long cache) {
this.cacheSize = cache;
return this;
}
public MetadataStoreBuilder setCreateIfMissing(boolean doCreate) {
this.createIfMissing = doCreate;
return this;
}
public MetadataStoreBuilder setConf(Configuration configuration) {
this.conf = configuration;
return this;
}
public MetadataStore build() throws IOException {
if (dbFile == null) {
throw new IllegalArgumentException("Failed to build metadata store, "
+ "dbFile is required but not found");
}
// Build db store based on configuration
MetadataStore store = null;
String impl = conf == null ?
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT :
conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
if (OZONE_METADATA_STORE_IMPL_LEVELDB.equals(impl)) {
Options options = new Options();
options.createIfMissing(createIfMissing);
if (cacheSize > 0) {
options.cacheSize(cacheSize);
}
store = new LevelDBStore(dbFile, options);
} else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
// TODO replace with rocksDB impl
store = new LevelDBStore(dbFile, new Options());
} else {
throw new IllegalArgumentException("Invalid argument for "
+ OzoneConfigKeys.OZONE_METADATA_STORE_IMPL
+ ". Expecting " + OZONE_METADATA_STORE_IMPL_LEVELDB
+ " or " + OZONE_METADATA_STORE_IMPL_ROCKSDB
+ ", but met " + impl);
}
return store;
}
}

View File

@ -565,6 +565,17 @@
</description>
</property>
<property>
<name>ozone.metastore.impl</name>
<value>LevelDB</value>
<description>
Ozone metadata store implementation. Ozone metadata are well distributed
to multiple services such as ksm, scm. They are stored in some local
key-value databases. This property determines which database library to
use. Supported value is either LevelDB or RocksDB.
</description>
</property>
<property>
<name>dfs.cblock.servicerpc-address</name>
<value></value>

View File

@ -17,12 +17,16 @@
*/
package org.apache.hadoop.ozone;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.junit.Rule;
import org.junit.Before;
import org.junit.After;
@ -34,13 +38,15 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.UUID;
/**
* Test class for {@link org.apache.hadoop.utils.LevelDBStore}.
* Test class for ozone metadata store.
*/
public class TestLevelDBStore {
public class TestMetadataStore {
private LevelDBStore store;
private MetadataStore store;
private File testDir;
private final static int MAX_GETRANGE_LENGTH = 100;
@ -51,7 +57,16 @@ public class TestLevelDBStore {
@Before
public void init() throws IOException {
testDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
store = new LevelDBStore(testDir, true);
Configuration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
store = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setCreateIfMissing(true)
.setDbFile(testDir)
.build();
// Add 20 entries.
// {a0 : a-value0} to {a9 : a-value9}
@ -70,11 +85,127 @@ public class TestLevelDBStore {
}
private byte[] getBytes(String str) {
return DFSUtilClient.string2Bytes(str);
return str == null ? null :
DFSUtilClient.string2Bytes(str);
}
private String getString(byte[] bytes) {
return DFSUtilClient.bytes2String(bytes);
return bytes == null ? null :
DFSUtilClient.bytes2String(bytes);
}
@Test
public void testGetDelete() throws IOException {
for (int i=0; i<10; i++) {
byte[] va = store.get(getBytes("a" + i));
Assert.assertEquals("a-value" + i, getString(va));
byte[] vb = store.get(getBytes("b" + i));
Assert.assertEquals("b-value" + i, getString(vb));
}
String keyToDel = "del-" + UUID.randomUUID().toString();
store.put(getBytes(keyToDel), getBytes(keyToDel));
Assert.assertEquals(keyToDel, getString(store.get(getBytes(keyToDel))));
store.delete(getBytes(keyToDel));
Assert.assertEquals(null, store.get(getBytes(keyToDel)));
}
@Test
public void testPeekFrom() throws IOException {
// Test peek from an element that has prev as well as next
testPeek("a3", "a2", "a4");
// Test peek from an element that only has prev
testPeek("b9", "b8", null);
// Test peek from an element that only has next
testPeek("a0", null, "a1");
}
private String getExpectedValue(String key) {
if (key == null) {
return null;
}
char[] arr = key.toCharArray();
return new StringBuffer().append(arr[0]).append("-value")
.append(arr[arr.length - 1]).toString();
}
private void testPeek(String peekKey, String prevKey, String nextKey)
throws IOException {
// Look for current
String k = null;
String v = null;
ImmutablePair<byte[], byte[]> current =
store.peekAround(0, getBytes(peekKey));
if (current != null) {
k = getString(current.getKey());
v = getString(current.getValue());
}
Assert.assertEquals(peekKey, k);
Assert.assertEquals(v, getExpectedValue(peekKey));
// Look for prev
k = null;
v = null;
ImmutablePair<byte[], byte[]> prev =
store.peekAround(-1, getBytes(peekKey));
if (prev != null) {
k = getString(prev.getKey());
v = getString(prev.getValue());
}
Assert.assertEquals(prevKey, k);
Assert.assertEquals(v, getExpectedValue(prevKey));
// Look for next
k = null;
v = null;
ImmutablePair<byte[], byte[]> next =
store.peekAround(1, getBytes(peekKey));
if (next != null) {
k = getString(next.getKey());
v = getString(next.getValue());
}
Assert.assertEquals(nextKey, k);
Assert.assertEquals(v, getExpectedValue(nextKey));
}
@Test
public void testIterateKeys() throws IOException {
// iterate keys from b0
ArrayList<String> result = Lists.newArrayList();
store.iterate(getBytes("b0"), (k, v) -> {
// b-value{i}
String value = getString(v);
char num = value.charAt(value.length() - 1);
// each value adds 1
int i = Character.getNumericValue(num) + 1;
value = value.substring(0, value.length() - 1) + i;
result.add(value);
return true;
});
Assert.assertFalse(result.isEmpty());
for (int i=0; i<result.size(); i++) {
Assert.assertEquals("b-value" + (i+1), result.get(i));
}
// iterate from a non exist key
result.clear();
store.iterate(getBytes("xyz"), (k, v) -> {
result.add(getString(v));
return true;
});
Assert.assertTrue(result.isEmpty());
// iterate from the beginning
result.clear();
store.iterate(null, (k, v) -> {
result.add(getString(v));
return true;
});
Assert.assertEquals(20, result.size());
}
@Test
@ -104,7 +235,7 @@ public class TestLevelDBStore {
// Filter keys by prefix.
// It should returns all "b*" entries.
LevelDBKeyFilter filter1 = new KeyPrefixFilter("b");
MetadataKeyFilter filter1 = new KeyPrefixFilter("b");
result = store.getRangeKVs(null, 100, filter1);
Assert.assertEquals(10, result.size());
Assert.assertTrue(result.stream().allMatch(entry ->
@ -117,7 +248,7 @@ public class TestLevelDBStore {
// Define a customized filter that filters keys by suffix.
// Returns all "*2" entries.
LevelDBKeyFilter filter2 = (preKey, currentKey, nextKey)
MetadataKeyFilter filter2 = (preKey, currentKey, nextKey)
-> getString(currentKey).endsWith("2");
result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter2);
Assert.assertEquals(2, result.size());

View File

@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -182,10 +183,13 @@ public class TestContainerPersistence {
String dbPath = status.getContainer().getDBPath();
LevelDBStore store = null;
MetadataStore store = null;
try {
store = new LevelDBStore(new File(dbPath), false);
Assert.assertNotNull(store.getDB());
store = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(dbPath))
.setCreateIfMissing(false)
.build();
Assert.assertNotNull(store);
} finally {
if (store != null) {
store.close();

View File

@ -41,6 +41,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -229,8 +230,8 @@ public class TestContainerReplicationManager {
* @throws TimeoutException
* @throws InterruptedException
*/
public void testAddingNewPoolWorks() throws TimeoutException,
InterruptedException {
public void testAddingNewPoolWorks()
throws TimeoutException, InterruptedException, IOException {
LogCapturer inProgressLog = LogCapturer.captureLogs(
LogFactory.getLog(InProgressPool.class));
GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.ALL);

View File

@ -274,6 +274,8 @@ public class TestKeySpaceManager {
try {
storageHandler.deleteVolume(createVolumeArgs);
Assert.fail("Expecting deletion should fail "
+ "because volume is not empty");
} catch (IOException ex) {
Assert.assertEquals(ex.getMessage(),
"Delete Volume failed, error:VOLUME_NOT_EMPTY");