HDFS-11926. Ozone: Implement a common helper to return a range of KVs in levelDB. Contributed by Weiwei Yang.
This commit is contained in:
parent
74ab303993
commit
9961fa3da0
|
@ -1,213 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.container.common.helpers;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
|
||||||
import org.iq80.leveldb.DBIterator;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An utility class to get a list of filtered keys.
|
|
||||||
*/
|
|
||||||
public class FilteredKeys implements Closeable {
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(FilteredKeys.class);
|
|
||||||
|
|
||||||
private final DBIterator dbIterator;
|
|
||||||
private final List<KeyFilter> filters;
|
|
||||||
private int count = 1000;
|
|
||||||
|
|
||||||
public FilteredKeys(LevelDBStore db, int count) {
|
|
||||||
Preconditions.checkNotNull(db, "LeveDBStore cannot be null.");
|
|
||||||
this.dbIterator = db.getIterator();
|
|
||||||
dbIterator.seekToFirst();
|
|
||||||
this.filters = new ArrayList<KeyFilter>();
|
|
||||||
if(count > 0) {
|
|
||||||
this.count = count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a key filter which filters keys by a certain criteria.
|
|
||||||
* Valid key filter is an implementation of {@link KeyFilter} class.
|
|
||||||
*
|
|
||||||
* @param filter
|
|
||||||
*/
|
|
||||||
public void addKeyFilter(KeyFilter filter) {
|
|
||||||
filter.setDbIterator(dbIterator);
|
|
||||||
filters.add(filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean filter(String keyName) {
|
|
||||||
if(filters != null && !filters.isEmpty()) {
|
|
||||||
for(KeyFilter filter : filters) {
|
|
||||||
if(!filter.check(keyName)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<KeyData> getFilteredKeys() {
|
|
||||||
List<KeyData> result = new ArrayList<KeyData>();
|
|
||||||
while (dbIterator.hasNext() && result.size() < count) {
|
|
||||||
Map.Entry<byte[], byte[]> entry = dbIterator.next();
|
|
||||||
String keyName = KeyUtils.getKeyName(entry.getKey());
|
|
||||||
if (filter(keyName)) {
|
|
||||||
try {
|
|
||||||
KeyData value = KeyUtils.getKeyData(entry.getValue());
|
|
||||||
KeyData data = new KeyData(value.getContainerName(), keyName);
|
|
||||||
result.add(data);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Ignoring adding an invalid entry", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void close() {
|
|
||||||
if(dbIterator != null) {
|
|
||||||
try {
|
|
||||||
dbIterator.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Failed to close levelDB connection.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An abstract class for all key filters.
|
|
||||||
*/
|
|
||||||
public static abstract class KeyFilter {
|
|
||||||
|
|
||||||
private DBIterator dbIterator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns if this filter is enabled.
|
|
||||||
*
|
|
||||||
* @return true if this filter is enabled, false otherwise.
|
|
||||||
*/
|
|
||||||
abstract boolean isEnabled();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Filters the element by key name. Returns true if the key
|
|
||||||
* with the given key name complies with the criteria defined
|
|
||||||
* in this filter.
|
|
||||||
*
|
|
||||||
* @param keyName
|
|
||||||
* @return true if filter passes and false otherwise.
|
|
||||||
*/
|
|
||||||
abstract boolean filterKey(String keyName);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If this filter is enabled, returns true if the key with the
|
|
||||||
* given key name complies with the criteria defined in this filter;
|
|
||||||
* if this filter is disabled, always returns true.
|
|
||||||
*
|
|
||||||
* @param keyName
|
|
||||||
* @return true if filter passes and false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean check(String keyName) {
|
|
||||||
return isEnabled()? filterKey(keyName) : true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the {@link DBIterator} this filter used to iterate DB entries.
|
|
||||||
*
|
|
||||||
* @param dbIterator
|
|
||||||
*/
|
|
||||||
protected void setDbIterator(DBIterator dbIterator) {
|
|
||||||
this.dbIterator = dbIterator;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected DBIterator getDbIterator() {
|
|
||||||
return this.dbIterator;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Filters keys with a previous key name,
|
|
||||||
* returns only the keys that whose position is behind the given key name.
|
|
||||||
*/
|
|
||||||
public static class PreKeyFilter extends KeyFilter{
|
|
||||||
|
|
||||||
private final String prevKey;
|
|
||||||
private boolean preKeyFound = false;
|
|
||||||
|
|
||||||
public PreKeyFilter(LevelDBStore db, String prevKey) {
|
|
||||||
Preconditions.checkNotNull(db, "LevelDB store cannot be null.");
|
|
||||||
this.prevKey = prevKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean isEnabled() {
|
|
||||||
return !Strings.isNullOrEmpty(prevKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean filterKey(String keyName) {
|
|
||||||
if (preKeyFound) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
if (getDbIterator().hasPrev()) {
|
|
||||||
byte[] prevKeyBytes = getDbIterator().peekPrev().getKey();
|
|
||||||
String prevKeyActual = KeyUtils.getKeyName(prevKeyBytes);
|
|
||||||
if (prevKeyActual.equals(prevKey)) {
|
|
||||||
preKeyFound = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Filters keys by a key name prefix.
|
|
||||||
*/
|
|
||||||
public static class KeyPrefixFilter extends KeyFilter{
|
|
||||||
|
|
||||||
private String prefix = null;
|
|
||||||
|
|
||||||
public KeyPrefixFilter(String prefix) {
|
|
||||||
this.prefix = prefix;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean isEnabled() {
|
|
||||||
return !Strings.isNullOrEmpty(prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean filterKey(String keyName) {
|
|
||||||
return keyName.startsWith(prefix) ? true : false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -20,9 +20,9 @@ package org.apache.hadoop.ozone.container.common.impl;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys;
|
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||||
|
@ -30,14 +30,15 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.KeyPrefixFilter;
|
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.PreKeyFilter;
|
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.Result.IO_EXCEPTION;
|
.Result.IO_EXCEPTION;
|
||||||
|
@ -169,7 +170,7 @@ public class KeyManagerImpl implements KeyManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<KeyData> listKey(
|
public List<KeyData> listKey(
|
||||||
Pipeline pipeline, String prefix, String prevKey, int count)
|
Pipeline pipeline, String prefix, String startKey, int count)
|
||||||
throws StorageContainerException {
|
throws StorageContainerException {
|
||||||
Preconditions.checkNotNull(pipeline,
|
Preconditions.checkNotNull(pipeline,
|
||||||
"Pipeline cannot be null.");
|
"Pipeline cannot be null.");
|
||||||
|
@ -178,10 +179,23 @@ public class KeyManagerImpl implements KeyManager {
|
||||||
ContainerData cData = containerManager.readContainer(pipeline
|
ContainerData cData = containerManager.readContainer(pipeline
|
||||||
.getContainerName());
|
.getContainerName());
|
||||||
LevelDBStore db = KeyUtils.getDB(cData, conf);
|
LevelDBStore db = KeyUtils.getDB(cData, conf);
|
||||||
try (FilteredKeys filteredKeys = new FilteredKeys(db, count)) {
|
try {
|
||||||
filteredKeys.addKeyFilter(new KeyPrefixFilter(prefix));
|
List<KeyData> result = new ArrayList<KeyData>();
|
||||||
filteredKeys.addKeyFilter(new PreKeyFilter(db, prevKey));
|
byte[] startKeyInBytes = startKey == null ? null :
|
||||||
return filteredKeys.getFilteredKeys();
|
DFSUtil.string2Bytes(startKey);
|
||||||
|
KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefix);
|
||||||
|
List<Map.Entry<byte[], byte[]>> range =
|
||||||
|
db.getRangeKVs(startKeyInBytes, count, prefixFilter);
|
||||||
|
for(Map.Entry<byte[], byte[]> entry : range) {
|
||||||
|
String keyName = KeyUtils.getKeyName(entry.getKey());
|
||||||
|
KeyData value = KeyUtils.getKeyData(entry.getValue());
|
||||||
|
KeyData data = new KeyData(value.getContainerName(), keyName);
|
||||||
|
result.add(data);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new StorageContainerException(e,
|
||||||
|
ContainerProtos.Result.IO_EXCEPTION);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,11 +60,11 @@ public interface KeyManager {
|
||||||
*
|
*
|
||||||
* @param pipeline - pipeline.
|
* @param pipeline - pipeline.
|
||||||
* @param prefix - Prefix in needed.
|
* @param prefix - Prefix in needed.
|
||||||
* @param prevKey - Key to Start from, EMPTY_STRING to begin.
|
* @param startKey - Key to start from, EMPTY_STRING to begin.
|
||||||
* @param count - Number of keys to return.
|
* @param count - Number of keys to return.
|
||||||
* @return List of Keys that match the criteria.
|
* @return List of Keys that match the criteria.
|
||||||
*/
|
*/
|
||||||
List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey,
|
List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
|
||||||
int count) throws StorageContainerException;
|
int count) throws StorageContainerException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.utils;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An utility class to filter levelDB keys.
|
||||||
|
*/
|
||||||
|
public class LevelDBKeyFilters {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for levelDB key filters.
|
||||||
|
*/
|
||||||
|
public interface LevelDBKeyFilter {
|
||||||
|
/**
|
||||||
|
* Filter levelDB key with a certain condition.
|
||||||
|
*
|
||||||
|
* @param preKey previous key.
|
||||||
|
* @param currentKey current key.
|
||||||
|
* @param nextKey next key.
|
||||||
|
* @return true if a certain condition satisfied, return false otherwise.
|
||||||
|
*/
|
||||||
|
boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class to filter key by a string prefix. This filter
|
||||||
|
* assumes keys can be parsed to a string.
|
||||||
|
*/
|
||||||
|
public static class KeyPrefixFilter implements LevelDBKeyFilter {
|
||||||
|
|
||||||
|
private String keyPrefix = null;
|
||||||
|
|
||||||
|
public KeyPrefixFilter(String keyPrefix) {
|
||||||
|
this.keyPrefix = keyPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean filterKey(byte[] preKey, byte[] currentKey,
|
||||||
|
byte[] nextKey) {
|
||||||
|
if (Strings.isNullOrEmpty(keyPrefix)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return currentKey != null &&
|
||||||
|
DFSUtil.bytes2String(currentKey).startsWith(keyPrefix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,21 +18,34 @@
|
||||||
|
|
||||||
package org.apache.hadoop.utils;
|
package org.apache.hadoop.utils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
|
||||||
import org.fusesource.leveldbjni.JniDBFactory;
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
import org.iq80.leveldb.WriteBatch;
|
import org.iq80.leveldb.WriteBatch;
|
||||||
import org.iq80.leveldb.DB;
|
import org.iq80.leveldb.DB;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
import org.iq80.leveldb.WriteOptions;
|
import org.iq80.leveldb.WriteOptions;
|
||||||
import org.iq80.leveldb.DBIterator;
|
import org.iq80.leveldb.DBIterator;
|
||||||
import org.iq80.leveldb.Options;
|
import org.iq80.leveldb.Snapshot;
|
||||||
|
import org.iq80.leveldb.ReadOptions;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LevelDB interface.
|
* LevelDB interface.
|
||||||
*/
|
*/
|
||||||
public class LevelDBStore implements Closeable {
|
public class LevelDBStore implements Closeable {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(LevelDBStore.class);
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
private final File dbFile;
|
private final File dbFile;
|
||||||
private final Options dbOptions;
|
private final Options dbOptions;
|
||||||
|
@ -194,4 +207,99 @@ public class LevelDBStore implements Closeable {
|
||||||
db.compactRange(null, null);
|
db.compactRange(null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a certain range of key value pairs as a list based on a startKey
|
||||||
|
* or count.
|
||||||
|
*
|
||||||
|
* @param keyPrefix start key.
|
||||||
|
* @param count number of entries to return.
|
||||||
|
* @return a range of entries or an empty list if nothing found.
|
||||||
|
* @throws IOException
|
||||||
|
*
|
||||||
|
* @see #getRangeKVs(byte[], int, LevelDBKeyFilter...)
|
||||||
|
*/
|
||||||
|
public List<Entry<byte[], byte[]>> getRangeKVs(byte[] keyPrefix, int count)
|
||||||
|
throws IOException {
|
||||||
|
LevelDBKeyFilter emptyFilter = (preKey, currentKey, nextKey) -> true;
|
||||||
|
return getRangeKVs(keyPrefix, count, emptyFilter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a certain range of key value pairs as a list based on a
|
||||||
|
* startKey or count. Further a {@link LevelDBKeyFilter} can be added to
|
||||||
|
* filter keys if necessary. To prevent race conditions while listing
|
||||||
|
* entries, this implementation takes a snapshot and lists the entries from
|
||||||
|
* the snapshot. This may, on the other hand, cause the range result slight
|
||||||
|
* different with actual data if data is updating concurrently.
|
||||||
|
* <p>
|
||||||
|
* If the startKey is specified and found in levelDB, this key and the keys
|
||||||
|
* after this key will be included in the result. If the startKey is null
|
||||||
|
* all entries will be included as long as other conditions are satisfied.
|
||||||
|
* If the given startKey doesn't exist, an IOException will be thrown.
|
||||||
|
* <p>
|
||||||
|
* The count argument is to limit number of total entries to return,
|
||||||
|
* the value for count must be an integer greater than 0.
|
||||||
|
* <p>
|
||||||
|
* This method allows to specify one or more {@link LevelDBKeyFilter}
|
||||||
|
* to filter keys by certain condition. Once given, only the entries
|
||||||
|
* whose key passes all the filters will be included in the result.
|
||||||
|
*
|
||||||
|
* @param startKey a start key.
|
||||||
|
* @param count max number of entries to return.
|
||||||
|
* @param filters customized one or more {@link LevelDBKeyFilter}.
|
||||||
|
* @return a list of entries found in the database.
|
||||||
|
* @throws IOException if an invalid startKey is given or other I/O errors.
|
||||||
|
* @throws IllegalArgumentException if count is less than 0.
|
||||||
|
*/
|
||||||
|
public List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||||
|
int count, LevelDBKeyFilter... filters) throws IOException {
|
||||||
|
List<Entry<byte[], byte[]>> result = new ArrayList<>();
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
if (count < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid count given " + count + ", count must be greater than 0");
|
||||||
|
}
|
||||||
|
Snapshot snapShot = null;
|
||||||
|
DBIterator dbIter = null;
|
||||||
|
try {
|
||||||
|
snapShot = db.getSnapshot();
|
||||||
|
ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
|
||||||
|
dbIter = db.iterator(readOptions);
|
||||||
|
dbIter.seekToFirst();
|
||||||
|
if (startKey == null) {
|
||||||
|
dbIter.seekToFirst();
|
||||||
|
} else {
|
||||||
|
if (db.get(startKey) == null) {
|
||||||
|
throw new IOException("Invalid start key, not found in current db.");
|
||||||
|
}
|
||||||
|
dbIter.seek(startKey);
|
||||||
|
}
|
||||||
|
while (dbIter.hasNext() && result.size() < count) {
|
||||||
|
byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null;
|
||||||
|
byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null;
|
||||||
|
Entry<byte[], byte[]> current = dbIter.next();
|
||||||
|
if (filters == null || Arrays.asList(filters).stream()
|
||||||
|
.allMatch(entry -> entry.filterKey(preKey,
|
||||||
|
current.getKey(), nextKey))) {
|
||||||
|
result.add(current);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (snapShot != null) {
|
||||||
|
snapShot.close();
|
||||||
|
}
|
||||||
|
if (dbIter != null) {
|
||||||
|
dbIter.close();
|
||||||
|
}
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
long timeConsumed = end - start;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Time consumed for getRangeKVs() is {},"
|
||||||
|
+ " result length is {}.",
|
||||||
|
timeConsumed, result.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
|
||||||
|
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
|
||||||
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for {@link org.apache.hadoop.utils.LevelDBStore}.
|
||||||
|
*/
|
||||||
|
public class TestLevelDBStore {
|
||||||
|
|
||||||
|
private LevelDBStore store;
|
||||||
|
private File testDir;
|
||||||
|
|
||||||
|
private final static int MAX_GETRANGE_LENGTH = 100;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException expectedException = ExpectedException.none();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws IOException {
|
||||||
|
testDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
|
||||||
|
store = new LevelDBStore(testDir, true);
|
||||||
|
|
||||||
|
// Add 20 entries.
|
||||||
|
// {a0 : a-value0} to {a9 : a-value9}
|
||||||
|
// {b0 : a-value0} to {b0 : b-value9}
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
store.put(getBytes("a" + i), getBytes("a-value" + i));
|
||||||
|
store.put(getBytes("b" + i), getBytes("b-value" + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() throws IOException {
|
||||||
|
store.close();
|
||||||
|
store.destroy();
|
||||||
|
FileUtils.deleteDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getBytes(String str) {
|
||||||
|
return DFSUtilClient.string2Bytes(str);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getString(byte[] bytes) {
|
||||||
|
return DFSUtilClient.bytes2String(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetRangeKVs() throws IOException {
|
||||||
|
List<Map.Entry<byte[], byte[]>> result = null;
|
||||||
|
|
||||||
|
// Set empty startKey will return values from beginning.
|
||||||
|
result = store.getRangeKVs(null, 5);
|
||||||
|
Assert.assertEquals(5, result.size());
|
||||||
|
Assert.assertEquals("a-value2", getString(result.get(2).getValue()));
|
||||||
|
|
||||||
|
// Returns max available entries after a valid startKey.
|
||||||
|
result = store.getRangeKVs(getBytes("b0"), MAX_GETRANGE_LENGTH);
|
||||||
|
Assert.assertEquals(10, result.size());
|
||||||
|
Assert.assertEquals("b0", getString(result.get(0).getKey()));
|
||||||
|
Assert.assertEquals("b-value0", getString(result.get(0).getValue()));
|
||||||
|
result = store.getRangeKVs(getBytes("b0"), 5);
|
||||||
|
Assert.assertEquals(5, result.size());
|
||||||
|
|
||||||
|
// Both startKey and count are honored.
|
||||||
|
result = store.getRangeKVs(getBytes("a9"), 2);
|
||||||
|
Assert.assertEquals(2, result.size());
|
||||||
|
Assert.assertEquals("a9", getString(result.get(0).getKey()));
|
||||||
|
Assert.assertEquals("a-value9", getString(result.get(0).getValue()));
|
||||||
|
Assert.assertEquals("b0", getString(result.get(1).getKey()));
|
||||||
|
Assert.assertEquals("b-value0", getString(result.get(1).getValue()));
|
||||||
|
|
||||||
|
// Filter keys by prefix.
|
||||||
|
// It should returns all "b*" entries.
|
||||||
|
LevelDBKeyFilter filter1 = new KeyPrefixFilter("b");
|
||||||
|
result = store.getRangeKVs(null, 100, filter1);
|
||||||
|
Assert.assertEquals(10, result.size());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(entry ->
|
||||||
|
new String(entry.getKey()).startsWith("b")
|
||||||
|
));
|
||||||
|
result = store.getRangeKVs(null, 3, filter1);
|
||||||
|
Assert.assertEquals(3, result.size());
|
||||||
|
result = store.getRangeKVs(getBytes("b3"), 1, filter1);
|
||||||
|
Assert.assertEquals("b-value3", getString(result.get(0).getValue()));
|
||||||
|
|
||||||
|
// Define a customized filter that filters keys by suffix.
|
||||||
|
// Returns all "*2" entries.
|
||||||
|
LevelDBKeyFilter filter2 = (preKey, currentKey, nextKey)
|
||||||
|
-> getString(currentKey).endsWith("2");
|
||||||
|
result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter2);
|
||||||
|
Assert.assertEquals(2, result.size());
|
||||||
|
Assert.assertEquals("a2", getString(result.get(0).getKey()));
|
||||||
|
Assert.assertEquals("b2", getString(result.get(1).getKey()));
|
||||||
|
result = store.getRangeKVs(null, 1, filter2);
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
Assert.assertEquals("a2", getString(result.get(0).getKey()));
|
||||||
|
|
||||||
|
// Apply multiple filters.
|
||||||
|
result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter1, filter2);
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
Assert.assertEquals("b2", getString(result.get(0).getKey()));
|
||||||
|
Assert.assertEquals("b-value2", getString(result.get(0).getValue()));
|
||||||
|
|
||||||
|
// If filter is null, no effect.
|
||||||
|
result = store.getRangeKVs(null, 1, null);
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
Assert.assertEquals("a0", getString(result.get(0).getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetRangeLength() throws IOException {
|
||||||
|
List<Map.Entry<byte[], byte[]>> result = null;
|
||||||
|
|
||||||
|
result = store.getRangeKVs(null, 0);
|
||||||
|
Assert.assertEquals(0, result.size());
|
||||||
|
|
||||||
|
result = store.getRangeKVs(null, 1);
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
|
||||||
|
// Count less than zero is invalid.
|
||||||
|
expectedException.expect(IllegalArgumentException.class);
|
||||||
|
expectedException.expectMessage("Invalid count given");
|
||||||
|
store.getRangeKVs(null, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidStartKey() throws IOException {
|
||||||
|
// If startKey is invalid, throws an invalid key exception.
|
||||||
|
expectedException.expect(IOException.class);
|
||||||
|
expectedException.expectMessage("Invalid start key");
|
||||||
|
store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
|
||||||
|
}
|
||||||
|
}
|
|
@ -760,17 +760,17 @@ public class TestContainerPersistence {
|
||||||
Assert.assertEquals(expectedKeys.get(1), result.get(0).getKeyName());
|
Assert.assertEquals(expectedKeys.get(1), result.get(0).getKeyName());
|
||||||
|
|
||||||
|
|
||||||
// List key with preKev filter
|
// List key with startKey filter
|
||||||
String k6 = expectedKeys.get(6);
|
String k6 = expectedKeys.get(6);
|
||||||
result = keyManager.listKey(pipeline, null, k6, 100);
|
result = keyManager.listKey(pipeline, null, k6, 100);
|
||||||
|
|
||||||
Assert.assertEquals(3, result.size());
|
Assert.assertEquals(4, result.size());
|
||||||
for (int i = 7; i < 10; i++) {
|
for (int i = 6; i < 10; i++) {
|
||||||
Assert.assertEquals(expectedKeys.get(i),
|
Assert.assertEquals(expectedKeys.get(i),
|
||||||
result.get(i - 7).getKeyName());
|
result.get(i - 6).getKeyName());
|
||||||
}
|
}
|
||||||
|
|
||||||
// List key with both prefix and preKey filter
|
// List key with both prefix and startKey filter
|
||||||
String k7 = expectedKeys.get(7);
|
String k7 = expectedKeys.get(7);
|
||||||
result = keyManager.listKey(pipeline, "k3", k7, 100);
|
result = keyManager.listKey(pipeline, "k3", k7, 100);
|
||||||
// k3 is after k7, enhance we get an empty result
|
// k3 is after k7, enhance we get an empty result
|
||||||
|
|
Loading…
Reference in New Issue