From 7d4a2d410404f3157c00deb54bfbdbb9c3e27c9a Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 12 Apr 2017 15:36:56 -0700 Subject: [PATCH] HDFS-11569. Ozone: Implement listKey function for KeyManager. Contributed by Weiwei Yang. --- .../common/helpers/FilteredKeys.java | 213 ++++++++++++++++++ .../container/common/helpers/KeyUtils.java | 28 +++ .../container/common/impl/KeyManagerImpl.java | 22 +- .../common/interfaces/KeyManager.java | 4 +- .../common/impl/TestContainerPersistence.java | 78 ++++++- 5 files changed, 338 insertions(+), 7 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FilteredKeys.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FilteredKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FilteredKeys.java new file mode 100644 index 00000000000..ee62314ca79 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FilteredKeys.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.DBIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * An utility class to get a list of filtered keys. + */ +public class FilteredKeys implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(FilteredKeys.class); + + private final DBIterator dbIterator; + private final List filters; + private int count = 1000; + + public FilteredKeys(LevelDBStore db, int count) { + Preconditions.checkNotNull(db, "LeveDBStore cannot be null."); + this.dbIterator = db.getIterator(); + dbIterator.seekToFirst(); + this.filters = new ArrayList(); + if(count > 0) { + this.count = count; + } + } + + /** + * Adds a key filter which filters keys by a certain criteria. + * Valid key filter is an implementation of {@link KeyFilter} class. + * + * @param filter + */ + public void addKeyFilter(KeyFilter filter) { + filter.setDbIterator(dbIterator); + filters.add(filter); + } + + private boolean filter(String keyName) { + if(filters != null && !filters.isEmpty()) { + for(KeyFilter filter : filters) { + if(!filter.check(keyName)) { + return false; + } + } + } + return true; + } + + public List getFilteredKeys() { + List result = new ArrayList(); + while (dbIterator.hasNext() && result.size() < count) { + Map.Entry entry = dbIterator.next(); + String keyName = KeyUtils.getKeyName(entry.getKey()); + if (filter(keyName)) { + try { + KeyData value = KeyUtils.getKeyData(entry.getValue()); + KeyData data = new KeyData(value.getContainerName(), keyName); + result.add(data); + } catch (IOException e) { + LOG.warn("Ignoring adding an invalid entry", e); + } + } + } + return result; + } + + @Override public void close() { + if(dbIterator != null) { + try { + dbIterator.close(); + } catch (IOException e) { + LOG.warn("Failed to close levelDB connection.", e); + } + } + } + + /** + * An abstract class for all key filters. + */ + public static abstract class KeyFilter { + + private DBIterator dbIterator; + + /** + * Returns if this filter is enabled. + * + * @return true if this filter is enabled, false otherwise. + */ + abstract boolean isEnabled(); + + /** + * Filters the element by key name. Returns true if the key + * with the given key name complies with the criteria defined + * in this filter. + * + * @param keyName + * @return true if filter passes and false otherwise. + */ + abstract boolean filterKey(String keyName); + + /** + * If this filter is enabled, returns true if the key with the + * given key name complies with the criteria defined in this filter; + * if this filter is disabled, always returns true. + * + * @param keyName + * @return true if filter passes and false otherwise. + */ + public boolean check(String keyName) { + return isEnabled()? filterKey(keyName) : true; + } + + /** + * Set the {@link DBIterator} this filter used to iterate DB entries. + * + * @param dbIterator + */ + protected void setDbIterator(DBIterator dbIterator) { + this.dbIterator = dbIterator; + } + + protected DBIterator getDbIterator() { + return this.dbIterator; + } + } + + /** + * Filters keys with a previous key name, + * returns only the keys that whose position is behind the given key name. + */ + public static class PreKeyFilter extends KeyFilter{ + + private final String prevKey; + private boolean preKeyFound = false; + + public PreKeyFilter(LevelDBStore db, String prevKey) { + Preconditions.checkNotNull(db, "LevelDB store cannot be null."); + this.prevKey = prevKey; + } + + @Override + protected boolean isEnabled() { + return !Strings.isNullOrEmpty(prevKey); + } + + @Override + protected boolean filterKey(String keyName) { + if (preKeyFound) { + return true; + } else { + if (getDbIterator().hasPrev()) { + byte[] prevKeyBytes = getDbIterator().peekPrev().getKey(); + String prevKeyActual = KeyUtils.getKeyName(prevKeyBytes); + if (prevKeyActual.equals(prevKey)) { + preKeyFound = true; + } + } + return false; + } + } + } + + /** + * Filters keys by a key name prefix. + */ + public static class KeyPrefixFilter extends KeyFilter{ + + private String prefix = null; + + public KeyPrefixFilter(String prefix) { + this.prefix = prefix; + } + + @Override + protected boolean isEnabled() { + return !Strings.isNullOrEmpty(prefix); + } + + @Override + protected boolean filterKey(String keyName) { + return keyName.startsWith(prefix) ? true : false; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java index 191183a7c86..4eb89990d20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java @@ -33,6 +33,8 @@ import java.nio.charset.Charset; import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .Result.UNABLE_TO_READ_METADATA_DB; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .Result.NO_SUCH_KEY; /** * Utils functions to help key functions. @@ -137,4 +139,30 @@ public final class KeyUtils { return builder.build(); } + /** + * Parses the key name from a bytes array. + * @param bytes key name in bytes. + * @return key name string. + */ + public static String getKeyName(byte[] bytes) { + return new String(bytes, ENCODING); + } + + /** + * Parses the {@link KeyData} from a bytes array. + * + * @param bytes key data in bytes. + * @return key data. + * @throws IOException if the bytes array is malformed or invalid. + */ + public static KeyData getKeyData(byte[] bytes) throws IOException { + try { + ContainerProtos.KeyData kd = ContainerProtos.KeyData.parseFrom(bytes); + KeyData data = KeyData.getFromProtoBuf(kd); + return data; + } catch (IOException e) { + throw new StorageContainerException("Failed to parse key data from the bytes array.", + NO_SUCH_KEY); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java index 8bd8a69adde..f2740b7cc5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys; import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; @@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.KeyPrefixFilter; +import org.apache.hadoop.ozone.container.common.helpers.FilteredKeys.PreKeyFilter; import org.apache.hadoop.utils.LevelDBStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,10 +168,21 @@ public class KeyManagerImpl implements KeyManager { * {@inheritDoc} */ @Override - public List listKey(Pipeline pipeline, String prefix, String - prevKey, int count) { - // TODO : Implement listKey function. - return null; + public List listKey( + Pipeline pipeline, String prefix, String prevKey, int count) + throws StorageContainerException { + Preconditions.checkNotNull(pipeline, + "Pipeline cannot be null."); + Preconditions.checkArgument(count > 0, + "Count must be a positive number."); + ContainerData cData = containerManager.readContainer(pipeline + .getContainerName()); + LevelDBStore db = KeyUtils.getDB(cData, conf); + try (FilteredKeys filteredKeys = new FilteredKeys(db, count)) { + filteredKeys.addKeyFilter(new KeyPrefixFilter(prefix)); + filteredKeys.addKeyFilter(new PreKeyFilter(db, prevKey)); + return filteredKeys.getFilteredKeys(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java index c8cc1829b7b..1d8469e7683 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java @@ -64,8 +64,8 @@ public interface KeyManager { * @param count - Number of keys to return. * @return List of Keys that match the criteria. */ - List listKey(Pipeline pipeline, String prefix, String prevKey, int - count); + List listKey(Pipeline pipeline, String prefix, String prevKey, + int count) throws StorageContainerException; /** * Shutdown keyManager. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 06d373319c1..4ff0da3e12e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -57,6 +57,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.ArrayList; +import java.util.UUID; import static org.apache.hadoop.ozone.container.ContainerTestHelper .createSingleNodePipeline; @@ -306,7 +308,10 @@ public class TestContainerPersistence { ContainerData cData = new ContainerData(containerName); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); - containerManager.createContainer(pipeline, cData); + if(!containerManager.getContainerMap() + .containsKey(containerName)) { + containerManager.createContainer(pipeline, cData); + } ChunkInfo info = getChunk(keyName, 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); @@ -681,4 +686,75 @@ public class TestContainerPersistence { createSingleNodePipeline("non_exist_container"), "non_exist_container", newData); } + + private KeyData writeKeyHelper(Pipeline pipeline, + String containerName, String keyName) + throws IOException, NoSuchAlgorithmException { + ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + KeyData keyData = new KeyData(containerName, keyName); + List chunkList = new LinkedList<>(); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + return keyData; + } + + @Test + public void testListKey() throws Exception { + String containerName = "c-0"; + Pipeline pipeline = createSingleNodePipeline(containerName); + List expectedKeys = new ArrayList(); + for (int i = 0; i < 10; i++) { + String keyName = "k" + i + "-" + UUID.randomUUID(); + expectedKeys.add(keyName); + KeyData kd = writeKeyHelper(pipeline, containerName, keyName); + keyManager.putKey(pipeline, kd); + } + + // List all keys + List result = keyManager.listKey(pipeline, null, null, 100); + Assert.assertEquals(10, result.size()); + + int index = 0; + for (int i = index; i < result.size(); i++) { + KeyData data = result.get(i); + Assert.assertEquals(containerName, data.getContainerName()); + Assert.assertEquals(expectedKeys.get(i), data.getKeyName()); + index++; + } + + // List key with prefix + result = keyManager.listKey(pipeline, "k1", null, 100); + // There is only one key with prefix k1 + Assert.assertEquals(1, result.size()); + Assert.assertEquals(expectedKeys.get(1), result.get(0).getKeyName()); + + + // List key with preKev filter + String k6 = expectedKeys.get(6); + result = keyManager.listKey(pipeline, null, k6, 100); + + Assert.assertEquals(3, result.size()); + for (int i = 7; i < 10; i++) { + Assert.assertEquals(expectedKeys.get(i), + result.get(i - 7).getKeyName()); + } + + // List key with both prefix and preKey filter + String k7 = expectedKeys.get(7); + result = keyManager.listKey(pipeline, "k3", k7, 100); + // k3 is after k7, enhance we get an empty result + Assert.assertTrue(result.isEmpty()); + + // Set a pretty small cap for the key count + result = keyManager.listKey(pipeline, null, null, 3); + Assert.assertEquals(3, result.size()); + for (int i = 0; i < 3; i++) { + Assert.assertEquals(expectedKeys.get(i), result.get(i).getKeyName()); + } + + // Count must be >0 + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Count must be a positive number."); + keyManager.listKey(pipeline, null, null, -1); + } }