HDDS-748. Use strongly typed metadata Table implementation. Contributed by Elek Marton.

This commit is contained in:
Bharat Viswanadham 2018-12-01 16:52:23 -08:00
parent 99e201dfe2
commit d15dc43659
18 changed files with 765 additions and 136 deletions

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.utils.db.Table.KeyValue;
/**
* Key value for raw Table implementations.
*/
public final class ByteArrayKeyValue implements KeyValue<byte[], byte[]> {
private byte[] key;
private byte[] value;
private ByteArrayKeyValue(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}
/**
* Create a KeyValue pair.
*
* @param key - Key Bytes
* @param value - Value bytes
* @return KeyValue object.
*/
public static ByteArrayKeyValue create(byte[] key, byte[] value) {
return new ByteArrayKeyValue(key, value);
}
/**
* Return key.
*
* @return byte[]
*/
public byte[] getKey() {
byte[] result = new byte[key.length];
System.arraycopy(key, 0, result, 0, key.length);
return result;
}
/**
* Return value.
*
* @return byte[]
*/
public byte[] getValue() {
byte[] result = new byte[value.length];
System.arraycopy(value, 0, result, 0, value.length);
return result;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
/**
* Codec interface to marshall/unmarshall data to/from a byte[] based
* key/value store.
*
* @param <T> Unserialized type
*/
public interface Codec<T> {
/**
* Convert object to raw persisted format.
*/
byte[] toPersistedFormat(T object);
/**
* Convert object from raw persisted format.
*/
T fromPersistedFormat(byte[] rawData);
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import java.util.HashMap;
import java.util.Map;
/**
* Collection of available codecs.
*/
public class CodecRegistry {
private Map<Class, Codec<?>> valueCodecs;
public CodecRegistry() {
valueCodecs = new HashMap<>();
valueCodecs.put(String.class, new StringCodec());
}
/**
* Convert raw value to strongly typed value/key with the help of a codec.
*
* @param rawData original byte array from the db.
* @param format Class of the return value
* @param <T> Type of the return value.
* @return the object with the parsed field data
*/
public <T> T asObject(byte[] rawData, Class<T> format) {
if (valueCodecs.containsKey(format)) {
return (T) valueCodecs.get(format).fromPersistedFormat(rawData);
} else {
throw new IllegalStateException(
"Codec is not registered for type: " + format);
}
}
/**
* Convert strongly typed object to raw data to store it in the kv store.
*
* @param object typed object.
* @param <T> Type of the typed object.
* @return byte array to store it ini the kv store.
*/
public <T> byte[] asRawData(T object) {
Class<T> format = (Class<T>) object.getClass();
if (valueCodecs.containsKey(format)) {
Codec<T> codec = (Codec<T>) valueCodecs.get(format);
return codec.toPersistedFormat(object);
} else {
throw new IllegalStateException(
"Codec is not registered for type: " + format);
}
}
}

View File

@ -41,7 +41,17 @@ public interface DBStore extends AutoCloseable {
* @return - TableStore.
* @throws IOException on Failure
*/
Table getTable(String name) throws IOException;
Table<byte[], byte[]> getTable(String name) throws IOException;
/**
* Gets an existing TableStore with implicit key/value conversion.
*
* @param name - Name of the TableStore to get
* @return - TableStore.
* @throws IOException on Failure
*/
<KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException;
/**
* Lists the Known list of Tables in a DB.

View File

@ -55,6 +55,7 @@ public class RDBStore implements DBStore {
private final File dbLocation;
private final WriteOptions writeOptions;
private final DBOptions dbOptions;
private final CodecRegistry codecRegistry;
private final Hashtable<String, ColumnFamilyHandle> handleTable;
private ObjectName statMBeanName;
@ -64,7 +65,7 @@ public class RDBStore implements DBStore {
Preconditions.checkNotNull(families);
Preconditions.checkArgument(families.size() > 0);
handleTable = new Hashtable<>();
codecRegistry = new CodecRegistry();
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>();
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
@ -254,7 +255,7 @@ public class RDBStore implements DBStore {
}
@Override
public Table getTable(String name) throws IOException {
public Table<byte[], byte[]> getTable(String name) throws IOException {
ColumnFamilyHandle handle = handleTable.get(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
@ -262,6 +263,13 @@ public class RDBStore implements DBStore {
return new RDBTable(this.db, handle, this.writeOptions);
}
@Override
public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException {
return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
valueType);
}
@Override
public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>();

View File

@ -19,17 +19,17 @@
package org.apache.hadoop.utils.db;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.rocksdb.RocksIterator;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.rocksdb.RocksIterator;
/**
* RocksDB store iterator.
*/
public class RDBStoreIterator implements TableIterator<KeyValue> {
public class RDBStoreIterator
implements TableIterator<byte[], ByteArrayKeyValue> {
private RocksIterator rocksDBIterator;
@ -39,7 +39,8 @@ public class RDBStoreIterator implements TableIterator<KeyValue> {
}
@Override
public void forEachRemaining(Consumer<? super KeyValue> action) {
public void forEachRemaining(
Consumer<? super ByteArrayKeyValue> action) {
while(hasNext()) {
action.accept(next());
}
@ -51,9 +52,10 @@ public class RDBStoreIterator implements TableIterator<KeyValue> {
}
@Override
public Table.KeyValue next() {
public ByteArrayKeyValue next() {
if (rocksDBIterator.isValid()) {
KeyValue value = KeyValue.create(rocksDBIterator.key(), rocksDBIterator
ByteArrayKeyValue value =
ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
.value());
rocksDBIterator.next();
return value;
@ -72,10 +74,10 @@ public class RDBStoreIterator implements TableIterator<KeyValue> {
}
@Override
public KeyValue seek(byte[] key) {
public ByteArrayKeyValue seek(byte[] key) {
rocksDBIterator.seek(key);
if (rocksDBIterator.isValid()) {
return KeyValue.create(rocksDBIterator.key(),
return ByteArrayKeyValue.create(rocksDBIterator.key(),
rocksDBIterator.value());
}
return null;

View File

@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
/**
* RocksDB implementation of ozone metadata store.
*/
public class RDBTable implements Table {
public class RDBTable implements Table<byte[], byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(RDBTable.class);
@ -108,7 +109,7 @@ public class RDBTable implements Table {
@Override
public boolean isEmpty() throws IOException {
try (TableIterator<KeyValue> keyIter = iterator()) {
try (TableIterator<byte[], ByteArrayKeyValue> keyIter = iterator()) {
keyIter.seekToFirst();
return !keyIter.hasNext();
}
@ -145,7 +146,7 @@ public class RDBTable implements Table {
}
@Override
public TableIterator<KeyValue> iterator() {
public TableIterator<byte[], ByteArrayKeyValue> iterator() {
ReadOptions readOptions = new ReadOptions();
return new RDBStoreIterator(db.newIterator(handle, readOptions));
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.hdfs.DFSUtil;
/**
* Codec to convert String to/from byte array.
*/
public class StringCodec implements Codec<String> {
@Override
public byte[] toPersistedFormat(String object) {
if (object != null) {
return DFSUtil.string2Bytes(object);
} else {
return null;
}
}
@Override
public String fromPersistedFormat(byte[] rawData) {
if (rawData != null) {
return DFSUtil.bytes2String(rawData);
} else {
return null;
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* different kind of tables.
*/
@InterfaceStability.Evolving
public interface Table extends AutoCloseable {
public interface Table<KEY, VALUE> extends AutoCloseable {
/**
* Puts a key-value pair into the store.
@ -38,7 +38,7 @@ public interface Table extends AutoCloseable {
* @param key metadata key
* @param value metadata value
*/
void put(byte[] key, byte[] value) throws IOException;
void put(KEY key, VALUE value) throws IOException;
/**
* Puts a key-value pair into the store as part of a bath operation.
@ -47,7 +47,7 @@ public interface Table extends AutoCloseable {
* @param key metadata key
* @param value metadata value
*/
void putWithBatch(BatchOperation batch, byte[] key, byte[] value)
void putWithBatch(BatchOperation batch, KEY key, VALUE value)
throws IOException;
/**
@ -64,7 +64,7 @@ public interface Table extends AutoCloseable {
* @return value in byte array or null if the key is not found.
* @throws IOException on Failure
*/
byte[] get(byte[] key) throws IOException;
VALUE get(KEY key) throws IOException;
/**
* Deletes a key from the metadata store.
@ -72,7 +72,7 @@ public interface Table extends AutoCloseable {
* @param key metadata key
* @throws IOException on Failure
*/
void delete(byte[] key) throws IOException;
void delete(KEY key) throws IOException;
/**
* Deletes a key from the metadata store as part of a batch operation.
@ -81,14 +81,14 @@ public interface Table extends AutoCloseable {
* @param key metadata key
* @throws IOException on Failure
*/
void deleteWithBatch(BatchOperation batch, byte[] key) throws IOException;
void deleteWithBatch(BatchOperation batch, KEY key) throws IOException;
/**
* Returns the iterator for this metadata store.
*
* @return MetaStoreIterator
*/
TableIterator<KeyValue> iterator();
TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator();
/**
* Returns the Name of this Table.
@ -100,53 +100,10 @@ public interface Table extends AutoCloseable {
/**
* Class used to represent the key and value pair of a db entry.
*/
class KeyValue {
interface KeyValue<KEY, VALUE> {
private final byte[] key;
private final byte[] value;
KEY getKey();
/**
* KeyValue Constructor, used to represent a key and value of a db entry.
*
* @param key - Key Bytes
* @param value - Value bytes
*/
private KeyValue(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}
/**
* Create a KeyValue pair.
*
* @param key - Key Bytes
* @param value - Value bytes
* @return KeyValue object.
*/
public static KeyValue create(byte[] key, byte[] value) {
return new KeyValue(key, value);
}
/**
* Return key.
*
* @return byte[]
*/
public byte[] getKey() {
byte[] result = new byte[key.length];
System.arraycopy(key, 0, result, 0, key.length);
return result;
}
/**
* Return value.
*
* @return byte[]
*/
public byte[] getValue() {
byte[] result = new byte[value.length];
System.arraycopy(value, 0, result, 0, value.length);
return result;
}
VALUE getValue();
}
}

View File

@ -27,7 +27,7 @@ import java.util.Iterator;
*
* @param <T>
*/
public interface TableIterator<T> extends Iterator<T>, Closeable {
public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
/**
* seek to first entry.
@ -43,8 +43,8 @@ public interface TableIterator<T> extends Iterator<T>, Closeable {
* Seek to the specific key.
*
* @param key - Bytes that represent the key.
* @return T.
* @return VALUE.
*/
T seek(byte[] key);
T seek(KEY key);
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import java.io.IOException;
/**
* Strongly typed table implementation.
* <p>
* Automatically converts values and keys using a raw byte[] based table
* implementation and registered converters.
*
* @param <KEY> type of the keys in the store.
* @param <VALUE> type of the values in the store.
*/
public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
private Table<byte[], byte[]> rawTable;
private CodecRegistry codecRegistry;
private Class<KEY> keyType;
private Class<VALUE> valueType;
public TypedTable(
Table<byte[], byte[]> rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
Class<VALUE> valueType) {
this.rawTable = rawTable;
this.codecRegistry = codecRegistry;
this.keyType = keyType;
this.valueType = valueType;
}
@Override
public void put(KEY key, VALUE value) throws IOException {
byte[] keyData = codecRegistry.asRawData(key);
byte[] valueData = codecRegistry.asRawData(value);
rawTable.put(keyData, valueData);
}
@Override
public void putWithBatch(BatchOperation batch, KEY key, VALUE value)
throws IOException {
byte[] keyData = codecRegistry.asRawData(key);
byte[] valueData = codecRegistry.asRawData(value);
rawTable.putWithBatch(batch, keyData, valueData);
}
@Override
public boolean isEmpty() throws IOException {
return rawTable.isEmpty();
}
@Override
public VALUE get(KEY key) throws IOException {
byte[] keyBytes = codecRegistry.asRawData(key);
byte[] valueBytes = rawTable.get(keyBytes);
return codecRegistry.asObject(valueBytes, valueType);
}
@Override
public void delete(KEY key) throws IOException {
rawTable.delete(codecRegistry.asRawData(key));
}
@Override
public void deleteWithBatch(BatchOperation batch, KEY key)
throws IOException {
rawTable.deleteWithBatch(batch, codecRegistry.asRawData(key));
}
@Override
public TableIterator<KEY, TypedKeyValue> iterator() {
TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iterator =
rawTable.iterator();
return new TypedTableIterator(iterator, keyType, valueType);
}
@Override
public String getName() throws IOException {
return rawTable.getName();
}
@Override
public void close() throws Exception {
rawTable.close();
}
/**
* Key value implementation for strongly typed tables.
*/
public class TypedKeyValue implements KeyValue<KEY, VALUE> {
private KeyValue<byte[], byte[]> rawKeyValue;
public TypedKeyValue(KeyValue<byte[], byte[]> rawKeyValue) {
this.rawKeyValue = rawKeyValue;
}
public TypedKeyValue(KeyValue<byte[], byte[]> rawKeyValue,
Class<KEY> keyType, Class<VALUE> valueType) {
this.rawKeyValue = rawKeyValue;
}
@Override
public KEY getKey() {
return codecRegistry.asObject(rawKeyValue.getKey(), keyType);
}
@Override
public VALUE getValue() {
return codecRegistry.asObject(rawKeyValue.getValue(), valueType);
}
}
/**
* Table Iterator implementation for strongly typed tables.
*/
public class TypedTableIterator implements TableIterator<KEY, TypedKeyValue> {
private TableIterator<byte[], ? extends KeyValue<byte[], byte[]>>
rawIterator;
public TypedTableIterator(
TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> rawIterator,
Class<KEY> keyType,
Class<VALUE> valueType) {
this.rawIterator = rawIterator;
}
@Override
public void seekToFirst() {
rawIterator.seekToFirst();
}
@Override
public void seekToLast() {
rawIterator.seekToLast();
}
@Override
public TypedKeyValue seek(KEY key) {
byte[] keyBytes = codecRegistry.asRawData(key);
KeyValue<byte[], byte[]> result = rawIterator.seek(keyBytes);
return new TypedKeyValue(result);
}
@Override
public void close() throws IOException {
rawIterator.close();
}
@Override
public boolean hasNext() {
return rawIterator.hasNext();
}
@Override
public TypedKeyValue next() {
return new TypedKeyValue(rawIterator.next(), keyType,
valueType);
}
}
}

View File

@ -124,7 +124,7 @@ public class TestDBStoreBuilder {
.addTable("First")
.addTable("Second")
.build()) {
try (Table firstTable = dbStore.getTable("First")) {
try (Table<byte[], byte[]> firstTable = dbStore.getTable("First")) {
byte[] key =
RandomStringUtils.random(9).getBytes(StandardCharsets.UTF_8);
byte[] value =
@ -154,7 +154,7 @@ public class TestDBStoreBuilder {
.addTable("Second")
.setProfile(DBProfile.DISK)
.build()) {
try (Table firstTable = dbStore.getTable("First")) {
try (Table<byte[], byte[]> firstTable = dbStore.getTable("First")) {
byte[] key =
RandomStringUtils.random(9).getBytes(StandardCharsets.UTF_8);
byte[] value =

View File

@ -19,8 +19,20 @@
package org.apache.hadoop.utils.db;
import org.apache.commons.lang3.RandomStringUtils;
import javax.management.MBeanServer;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -34,17 +46,6 @@ import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import javax.management.MBeanServer;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* RDBStore Tests.
*/
@ -126,7 +127,8 @@ public class TestRDBStore {
try (Table firstTable = rdbStore.getTable(families.get(1))) {
firstTable.put(key, value);
try (Table secondTable = rdbStore.getTable(families.get(2))) {
try (Table<byte[], byte[]> secondTable = rdbStore
.getTable(families.get(2))) {
rdbStore.move(key, firstTable, secondTable);
byte[] newvalue = secondTable.get(key);
// Make sure we have value in the second table
@ -150,7 +152,8 @@ public class TestRDBStore {
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
try (Table firstTable = rdbStore.getTable(families.get(1))) {
firstTable.put(key, value);
try (Table secondTable = rdbStore.getTable(families.get(2))) {
try (Table<byte[], byte[]> secondTable = rdbStore
.getTable(families.get(2))) {
rdbStore.move(key, nextValue, firstTable, secondTable);
byte[] newvalue = secondTable.get(key);
// Make sure we have value in the second table

View File

@ -95,7 +95,7 @@ public class TestRDBTableStore {
@Test
public void putGetAndEmpty() throws Exception {
try (Table testTable = rdbStore.getTable("First")) {
try (Table<byte[], byte[]> testTable = rdbStore.getTable("First")) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
@ -209,7 +209,7 @@ public class TestRDBTableStore {
testTable.put(key, value);
}
int localCount = 0;
try (TableIterator<Table.KeyValue> iter = testTable.iterator()) {
try (TableIterator<byte[], Table.KeyValue> iter = testTable.iterator()) {
while (iter.hasNext()) {
Table.KeyValue keyValue = iter.next();
localCount++;

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
/**
* Tests for RocksDBTable Store.
*/
public class TestTypedRDBTableStore {
private static int count = 0;
private final List<String> families =
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth");
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private RDBStore rdbStore = null;
private DBOptions options = null;
private CodecRegistry codecRegistry;
@Before
public void setUp() throws Exception {
options = new DBOptions();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);
Statistics statistics = new Statistics();
statistics.setStatsLevel(StatsLevel.ALL);
options = options.setStatistics(statistics);
Set<TableConfig> configSet = new HashSet<>();
for (String name : families) {
TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
configSet.add(newConfig);
}
rdbStore = new RDBStore(folder.newFolder(), options, configSet);
codecRegistry = new CodecRegistry();
}
@After
public void tearDown() throws Exception {
if (rdbStore != null) {
rdbStore.close();
}
}
@Test
public void toIOException() {
}
@Test
public void putGetAndEmpty() throws Exception {
try (Table<String, String> testTable = createTypedTable(
"First")) {
String key =
RandomStringUtils.random(10);
String value = RandomStringUtils.random(10);
testTable.put(key, value);
Assert.assertFalse(testTable.isEmpty());
String readValue = testTable.get(key);
Assert.assertEquals(value, readValue);
}
try (Table secondTable = rdbStore.getTable("Second")) {
Assert.assertTrue(secondTable.isEmpty());
}
}
private Table<String, String> createTypedTable(String name)
throws IOException {
return new TypedTable<String, String>(
rdbStore.getTable(name),
codecRegistry,
String.class, String.class);
}
@Test
public void delete() throws Exception {
List<String> deletedKeys = new LinkedList<>();
List<String> validKeys = new LinkedList<>();
String value =
RandomStringUtils.random(10);
for (int x = 0; x < 100; x++) {
deletedKeys.add(
RandomStringUtils.random(10));
}
for (int x = 0; x < 100; x++) {
validKeys.add(
RandomStringUtils.random(10));
}
// Write all the keys and delete the keys scheduled for delete.
//Assert we find only expected keys in the Table.
try (Table<String, String> testTable = createTypedTable(
"Fourth")) {
for (int x = 0; x < deletedKeys.size(); x++) {
testTable.put(deletedKeys.get(x), value);
testTable.delete(deletedKeys.get(x));
}
for (int x = 0; x < validKeys.size(); x++) {
testTable.put(validKeys.get(x), value);
}
for (int x = 0; x < validKeys.size(); x++) {
Assert.assertNotNull(testTable.get(validKeys.get(0)));
}
for (int x = 0; x < deletedKeys.size(); x++) {
Assert.assertNull(testTable.get(deletedKeys.get(0)));
}
}
}
@Test
public void batchPut() throws Exception {
try (Table<String, String> testTable = createTypedTable(
"Fourth");
BatchOperation batch = rdbStore.initBatchOperation()) {
//given
String key =
RandomStringUtils.random(10);
String value =
RandomStringUtils.random(10);
//when
testTable.putWithBatch(batch, key, value);
rdbStore.commitBatchOperation(batch);
//then
Assert.assertNotNull(testTable.get(key));
}
}
@Test
public void batchDelete() throws Exception {
try (Table<String, String> testTable = createTypedTable(
"Fourth");
BatchOperation batch = rdbStore.initBatchOperation()) {
//given
String key =
RandomStringUtils.random(10);
String value =
RandomStringUtils.random(10);
testTable.put(key, value);
//when
testTable.deleteWithBatch(batch, key);
rdbStore.commitBatchOperation(batch);
//then
Assert.assertNull(testTable.get(key));
}
}
private static boolean consume(Table.KeyValue keyValue) {
count++;
Assert.assertNotNull(keyValue.getKey());
return true;
}
@Test
public void forEachAndIterator() throws Exception {
final int iterCount = 100;
try (Table<String, String> testTable = createTypedTable(
"Sixth")) {
for (int x = 0; x < iterCount; x++) {
String key =
RandomStringUtils.random(10);
String value =
RandomStringUtils.random(10);
testTable.put(key, value);
}
int localCount = 0;
try (TableIterator<String, ? extends KeyValue<String, String>> iter =
testTable.iterator()) {
while (iter.hasNext()) {
Table.KeyValue keyValue = iter.next();
localCount++;
}
Assert.assertEquals(iterCount, localCount);
iter.seekToFirst();
iter.forEachRemaining(TestTypedRDBTableStore::consume);
Assert.assertEquals(iterCount, count);
}
}
}
}

View File

@ -203,50 +203,50 @@ public interface OMMetadataManager {
*
* @return UserTable.
*/
Table getUserTable();
Table<byte[], byte[]> getUserTable();
/**
* Returns the Volume Table.
*
* @return VolumeTable.
*/
Table getVolumeTable();
Table<byte[], byte[]> getVolumeTable();
/**
* Returns the BucketTable.
*
* @return BucketTable.
*/
Table getBucketTable();
Table<byte[], byte[]> getBucketTable();
/**
* Returns the KeyTable.
*
* @return KeyTable.
*/
Table getKeyTable();
Table<byte[], byte[]> getKeyTable();
/**
* Get Deleted Table.
*
* @return Deleted Table.
*/
Table getDeletedTable();
Table<byte[], byte[]> getDeletedTable();
/**
* Gets the OpenKeyTable.
*
* @return Table.
*/
Table getOpenKeyTable();
Table<byte[], byte[]> getOpenKeyTable();
/**
* Gets the S3Bucket to Ozone Volume/bucket mapping table.
*
* @return Table.
*/
Table getS3Table();
Table<byte[], byte[]> getS3Table();
/**
* Returns number of rows in a table. This should not be used for very
* large tables.
@ -254,5 +254,6 @@ public interface OMMetadataManager {
* @return long
* @throws IOException
*/
long countRowsInTable(Table table) throws IOException;
<KEY, VALUE> long countRowsInTable(Table<KEY, VALUE> table)
throws IOException;
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.apache.hadoop.utils.db.TableIterator;
import org.junit.AfterClass;
import org.junit.Assert;
@ -652,8 +653,7 @@ public class TestOzoneManager {
// Make sure the deleted key has been moved to the deleted table.
OMMetadataManager manager = cluster.getOzoneManager().
getMetadataManager();
try(TableIterator<Table.KeyValue> iter =
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iter =
manager.getDeletedTable().iterator()) {
iter.seekToFirst();
Table.KeyValue kv = iter.next();

View File

@ -16,10 +16,16 @@
*/
package org.apache.hadoop.ozone.om;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
@ -40,25 +46,20 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.DBStoreBuilder;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.apache.hadoop.utils.db.TableIterator;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ozone metadata manager interface.
@ -122,37 +123,37 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
}
@Override
public Table getUserTable() {
public Table<byte[], byte[]> getUserTable() {
return userTable;
}
@Override
public Table getVolumeTable() {
public Table<byte[], byte[]> getVolumeTable() {
return volumeTable;
}
@Override
public Table getBucketTable() {
public Table<byte[], byte[]> getBucketTable() {
return bucketTable;
}
@Override
public Table getKeyTable() {
public Table<byte[], byte[]> getKeyTable() {
return keyTable;
}
@Override
public Table getDeletedTable() {
public Table<byte[], byte[]> getDeletedTable() {
return deletedTable;
}
@Override
public Table getOpenKeyTable() {
public Table<byte[], byte[]> getOpenKeyTable() {
return openKeyTable;
}
@Override
public Table getS3Table() {
public Table<byte[], byte[]> getS3Table() {
return s3Table;
}
@ -349,8 +350,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
@Override
public boolean isVolumeEmpty(String volume) throws IOException {
byte[] volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
try (TableIterator<Table.KeyValue> bucketIter = bucketTable.iterator()) {
Table.KeyValue kv = bucketIter.seek(volumePrefix);
try (TableIterator<byte[], Table.KeyValue> bucketIter = bucketTable
.iterator()) {
Table.KeyValue<byte[], byte[]> kv = bucketIter.seek(volumePrefix);
if (kv != null && startsWith(kv.getKey(), volumePrefix)) {
return false; // we found at least one bucket with this volume prefix.
}
@ -370,8 +372,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
byte[] keyPrefix = getBucketKey(volume, bucket + OM_KEY_PREFIX);
try (TableIterator<Table.KeyValue> keyIter = keyTable.iterator()) {
Table.KeyValue kv = keyIter.seek(keyPrefix);
try (TableIterator<byte[], Table.KeyValue> keyIter = keyTable.iterator()) {
Table.KeyValue<byte[], byte[]> kv = keyIter.seek(keyPrefix);
if (kv != null && startsWith(kv.getKey(), keyPrefix)) {
return false; // we found at least one key with this vol/bucket prefix.
}
@ -422,8 +424,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
seekPrefix = getVolumeKey(volumeName + OM_KEY_PREFIX);
}
int currentCount = 0;
try (TableIterator<Table.KeyValue> bucketIter = bucketTable.iterator()) {
Table.KeyValue kv = bucketIter.seek(startKey);
try (TableIterator<byte[], Table.KeyValue> bucketIter = bucketTable
.iterator()) {
Table.KeyValue<byte[], byte[]> kv = bucketIter.seek(startKey);
while (currentCount < maxNumOfBuckets && bucketIter.hasNext()) {
kv = bucketIter.next();
// Skip the Start Bucket if needed.
@ -483,8 +486,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
}
int currentCount = 0;
try (TableIterator<Table.KeyValue> keyIter = getKeyTable().iterator()) {
Table.KeyValue kv = keyIter.seek(seekKey);
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> keyIter =
getKeyTable()
.iterator()) {
Table.KeyValue<byte[], byte[]> kv = keyIter.seek(seekKey);
while (currentCount < maxKeys && keyIter.hasNext()) {
kv = keyIter.next();
// Skip the Start key if needed.
@ -578,10 +583,12 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
try (TableIterator<Table.KeyValue> keyIter = getDeletedTable().iterator()) {
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> keyIter =
getDeletedTable()
.iterator()) {
int currentCount = 0;
while (keyIter.hasNext() && currentCount < keyCount) {
Table.KeyValue kv = keyIter.next();
KeyValue<byte[], byte[]> kv = keyIter.next();
if (kv != null) {
OmKeyInfo info =
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue()));
@ -632,11 +639,12 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
}
@Override
public long countRowsInTable(Table table) throws IOException {
public <KEY, VALUE> long countRowsInTable(Table<KEY, VALUE> table)
throws IOException {
long count = 0;
if (table != null) {
try (TableIterator<Table.KeyValue> keyValueTableIterator =
table.iterator()) {
try (TableIterator<KEY, ? extends KeyValue<KEY, VALUE>>
keyValueTableIterator = table.iterator()) {
while (keyValueTableIterator.hasNext()) {
keyValueTableIterator.next();
count++;