HDFS-11569. Ozone: Implement listKey function for KeyManager. Contributed by Weiwei Yang.
This commit is contained in:
parent
cbb7cc26a5
commit
7d4a2d4104
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.helpers;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.utils.LevelDBStore;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An utility class to get a list of filtered keys.
|
||||
*/
|
||||
public class FilteredKeys implements Closeable {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FilteredKeys.class);
|
||||
|
||||
private final DBIterator dbIterator;
|
||||
private final List<KeyFilter> filters;
|
||||
private int count = 1000;
|
||||
|
||||
public FilteredKeys(LevelDBStore db, int count) {
|
||||
Preconditions.checkNotNull(db, "LeveDBStore cannot be null.");
|
||||
this.dbIterator = db.getIterator();
|
||||
dbIterator.seekToFirst();
|
||||
this.filters = new ArrayList<KeyFilter>();
|
||||
if(count > 0) {
|
||||
this.count = count;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a key filter which filters keys by a certain criteria.
|
||||
* Valid key filter is an implementation of {@link KeyFilter} class.
|
||||
*
|
||||
* @param filter
|
||||
*/
|
||||
public void addKeyFilter(KeyFilter filter) {
|
||||
filter.setDbIterator(dbIterator);
|
||||
filters.add(filter);
|
||||
}
|
||||
|
||||
private boolean filter(String keyName) {
|
||||
if(filters != null && !filters.isEmpty()) {
|
||||
for(KeyFilter filter : filters) {
|
||||
if(!filter.check(keyName)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public List<KeyData> getFilteredKeys() {
|
||||
List<KeyData> result = new ArrayList<KeyData>();
|
||||
while (dbIterator.hasNext() && result.size() < count) {
|
||||
Map.Entry<byte[], byte[]> entry = dbIterator.next();
|
||||
String keyName = KeyUtils.getKeyName(entry.getKey());
|
||||
if (filter(keyName)) {
|
||||
try {
|
||||
KeyData value = KeyUtils.getKeyData(entry.getValue());
|
||||
KeyData data = new KeyData(value.getContainerName(), keyName);
|
||||
result.add(data);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Ignoring adding an invalid entry", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
if(dbIterator != null) {
|
||||
try {
|
||||
dbIterator.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to close levelDB connection.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An abstract class for all key filters.
|
||||
*/
|
||||
public static abstract class KeyFilter {
|
||||
|
||||
private DBIterator dbIterator;
|
||||
|
||||
/**
|
||||
* Returns if this filter is enabled.
|
||||
*
|
||||
* @return true if this filter is enabled, false otherwise.
|
||||
*/
|
||||
abstract boolean isEnabled();
|
||||
|
||||
/**
|
||||
* Filters the element by key name. Returns true if the key
|
||||
* with the given key name complies with the criteria defined
|
||||
* in this filter.
|
||||
*
|
||||
* @param keyName
|
||||
* @return true if filter passes and false otherwise.
|
||||
*/
|
||||
abstract boolean filterKey(String keyName);
|
||||
|
||||
/**
|
||||
* If this filter is enabled, returns true if the key with the
|
||||
* given key name complies with the criteria defined in this filter;
|
||||
* if this filter is disabled, always returns true.
|
||||
*
|
||||
* @param keyName
|
||||
* @return true if filter passes and false otherwise.
|
||||
*/
|
||||
public boolean check(String keyName) {
|
||||
return isEnabled()? filterKey(keyName) : true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link DBIterator} this filter used to iterate DB entries.
|
||||
*
|
||||
* @param dbIterator
|
||||
*/
|
||||
protected void setDbIterator(DBIterator dbIterator) {
|
||||
this.dbIterator = dbIterator;
|
||||
}
|
||||
|
||||
protected DBIterator getDbIterator() {
|
||||
return this.dbIterator;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters keys with a previous key name,
|
||||
* returns only the keys that whose position is behind the given key name.
|
||||
*/
|
||||
public static class PreKeyFilter extends KeyFilter{
|
||||
|
||||
private final String prevKey;
|
||||
private boolean preKeyFound = false;
|
||||
|
||||
public PreKeyFilter(LevelDBStore db, String prevKey) {
|
||||
Preconditions.checkNotNull(db, "LevelDB store cannot be null.");
|
||||
this.prevKey = prevKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isEnabled() {
|
||||
return !Strings.isNullOrEmpty(prevKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterKey(String keyName) {
|
||||
if (preKeyFound) {
|
||||
return true;
|
||||
} else {
|
||||
if (getDbIterator().hasPrev()) {
|
||||
byte[] prevKeyBytes = getDbIterator().peekPrev().getKey();
|
||||
String prevKeyActual = KeyUtils.getKeyName(prevKeyBytes);
|
||||
if (prevKeyActual.equals(prevKey)) {
|
||||
preKeyFound = true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters keys by a key name prefix.
|
||||
*/
|
||||
public static class KeyPrefixFilter extends KeyFilter{
|
||||
|
||||
private String prefix = null;
|
||||
|
||||
public KeyPrefixFilter(String prefix) {
|
||||
this.prefix = prefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isEnabled() {
|
||||
return !Strings.isNullOrEmpty(prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterKey(String keyName) {
|
||||
return keyName.startsWith(prefix) ? true : false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,6 +33,8 @@ import java.nio.charset.Charset;
|
|||
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.Result.UNABLE_TO_READ_METADATA_DB;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.Result.NO_SUCH_KEY;
|
||||
|
||||
/**
|
||||
* Utils functions to help key functions.
|
||||
|
@ -137,4 +139,30 @@ public final class KeyUtils {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the key name from a bytes array.
|
||||
* @param bytes key name in bytes.
|
||||
* @return key name string.
|
||||
*/
|
||||
public static String getKeyName(byte[] bytes) {
|
||||
return new String(bytes, ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the {@link KeyData} from a bytes array.
|
||||
*
|
||||
* @param bytes key data in bytes.
|
||||
* @return key data.
|
||||
* @throws IOException if the bytes array is malformed or invalid.
|
||||
*/
|
||||
public static KeyData getKeyData(byte[] bytes) throws IOException {
|
||||
try {
|
||||
ContainerProtos.KeyData kd = ContainerProtos.KeyData.parseFrom(bytes);
|
||||
KeyData data = KeyData.getFromProtoBuf(kd);
|
||||
return data;
|
||||
} catch (IOException e) {
|
||||
throw new StorageContainerException("Failed to parse key data from the bytes array.",
|
||||
NO_SUCH_KEY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
|
@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
|||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.KeyPrefixFilter;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.PreKeyFilter;
|
||||
import org.apache.hadoop.utils.LevelDBStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -165,10 +168,21 @@ public class KeyManagerImpl implements KeyManager {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<KeyData> listKey(Pipeline pipeline, String prefix, String
|
||||
prevKey, int count) {
|
||||
// TODO : Implement listKey function.
|
||||
return null;
|
||||
public List<KeyData> listKey(
|
||||
Pipeline pipeline, String prefix, String prevKey, int count)
|
||||
throws StorageContainerException {
|
||||
Preconditions.checkNotNull(pipeline,
|
||||
"Pipeline cannot be null.");
|
||||
Preconditions.checkArgument(count > 0,
|
||||
"Count must be a positive number.");
|
||||
ContainerData cData = containerManager.readContainer(pipeline
|
||||
.getContainerName());
|
||||
LevelDBStore db = KeyUtils.getDB(cData, conf);
|
||||
try (FilteredKeys filteredKeys = new FilteredKeys(db, count)) {
|
||||
filteredKeys.addKeyFilter(new KeyPrefixFilter(prefix));
|
||||
filteredKeys.addKeyFilter(new PreKeyFilter(db, prevKey));
|
||||
return filteredKeys.getFilteredKeys();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -64,8 +64,8 @@ public interface KeyManager {
|
|||
* @param count - Number of keys to return.
|
||||
* @return List of Keys that match the criteria.
|
||||
*/
|
||||
List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey, int
|
||||
count);
|
||||
List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey,
|
||||
int count) throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Shutdown keyManager.
|
||||
|
|
|
@ -57,6 +57,8 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||
.createSingleNodePipeline;
|
||||
|
@ -306,7 +308,10 @@ public class TestContainerPersistence {
|
|||
ContainerData cData = new ContainerData(containerName);
|
||||
cData.addMetadata("VOLUME", "shire");
|
||||
cData.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(pipeline, cData);
|
||||
if(!containerManager.getContainerMap()
|
||||
.containsKey(containerName)) {
|
||||
containerManager.createContainer(pipeline, cData);
|
||||
}
|
||||
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
|
||||
byte[] data = getData(datalen);
|
||||
setDataChecksum(info, data);
|
||||
|
@ -681,4 +686,75 @@ public class TestContainerPersistence {
|
|||
createSingleNodePipeline("non_exist_container"),
|
||||
"non_exist_container", newData);
|
||||
}
|
||||
|
||||
private KeyData writeKeyHelper(Pipeline pipeline,
|
||||
String containerName, String keyName)
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
|
||||
KeyData keyData = new KeyData(containerName, keyName);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
return keyData;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListKey() throws Exception {
|
||||
String containerName = "c-0";
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
List<String> expectedKeys = new ArrayList<String>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String keyName = "k" + i + "-" + UUID.randomUUID();
|
||||
expectedKeys.add(keyName);
|
||||
KeyData kd = writeKeyHelper(pipeline, containerName, keyName);
|
||||
keyManager.putKey(pipeline, kd);
|
||||
}
|
||||
|
||||
// List all keys
|
||||
List<KeyData> result = keyManager.listKey(pipeline, null, null, 100);
|
||||
Assert.assertEquals(10, result.size());
|
||||
|
||||
int index = 0;
|
||||
for (int i = index; i < result.size(); i++) {
|
||||
KeyData data = result.get(i);
|
||||
Assert.assertEquals(containerName, data.getContainerName());
|
||||
Assert.assertEquals(expectedKeys.get(i), data.getKeyName());
|
||||
index++;
|
||||
}
|
||||
|
||||
// List key with prefix
|
||||
result = keyManager.listKey(pipeline, "k1", null, 100);
|
||||
// There is only one key with prefix k1
|
||||
Assert.assertEquals(1, result.size());
|
||||
Assert.assertEquals(expectedKeys.get(1), result.get(0).getKeyName());
|
||||
|
||||
|
||||
// List key with preKev filter
|
||||
String k6 = expectedKeys.get(6);
|
||||
result = keyManager.listKey(pipeline, null, k6, 100);
|
||||
|
||||
Assert.assertEquals(3, result.size());
|
||||
for (int i = 7; i < 10; i++) {
|
||||
Assert.assertEquals(expectedKeys.get(i),
|
||||
result.get(i - 7).getKeyName());
|
||||
}
|
||||
|
||||
// List key with both prefix and preKey filter
|
||||
String k7 = expectedKeys.get(7);
|
||||
result = keyManager.listKey(pipeline, "k3", k7, 100);
|
||||
// k3 is after k7, enhance we get an empty result
|
||||
Assert.assertTrue(result.isEmpty());
|
||||
|
||||
// Set a pretty small cap for the key count
|
||||
result = keyManager.listKey(pipeline, null, null, 3);
|
||||
Assert.assertEquals(3, result.size());
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Assert.assertEquals(expectedKeys.get(i), result.get(i).getKeyName());
|
||||
}
|
||||
|
||||
// Count must be >0
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Count must be a positive number.");
|
||||
keyManager.listKey(pipeline, null, null, -1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue