HDDS-356. Support ColumnFamily based RockDBStore and TableStore.

Contributed by Anu Engineer.
This commit is contained in:
Anu Engineer 2018-08-22 18:55:14 -07:00
parent af4b705b5f
commit b021249ac8
10 changed files with 1285 additions and 0 deletions

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.util.ArrayList;
/**
* The DBStore interface provides the ability to create Tables, which store
* a specific type of Key-Value pair. Some DB interfaces like LevelDB will not
* be able to do this. In those case a Table creation will map to a default
* store.
*
*/
@InterfaceStability.Evolving
public interface DBStore extends AutoCloseable {
/**
* Gets an existing TableStore.
*
* @param name - Name of the TableStore to get
* @return - TableStore.
* @throws IOException on Failure
*/
Table getTable(String name) throws IOException;
/**
* Lists the Known list of Tables in a DB.
*
* @return List of Tables, in case of Rocks DB and LevelDB we will return at
* least one entry called DEFAULT.
* @throws IOException on Failure
*/
ArrayList<Table> listTables() throws IOException;
/**
* Compact the entire database.
*
* @throws IOException on Failure
*/
void compactDB() throws IOException;
/**
* Moves a key from the Source Table to the destination Table.
*
* @param key - Key to move.
* @param source - Source Table.
* @param dest - Destination Table.
* @throws IOException on Failure
*/
void move(byte[] key, Table source, Table dest) throws IOException;
/**
* Moves a key from the Source Table to the destination Table and updates the
* destination to the new value.
*
* @param key - Key to move.
* @param value - new value to write to the destination table.
* @param source - Source Table.
* @param dest - Destination Table.
* @throws IOException on Failure
*/
void move(byte[] key, byte[] value, Table source, Table dest)
throws IOException;
/**
* Returns an estimated count of keys in this DB.
*
* @return long, estimate of keys in the DB.
*/
long getEstimatedKeyCount() throws IOException;
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.utils.RocksDBStoreMBean;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
/**
* RocksDB Store that supports creating Tables in DB.
*/
public class RDBStore implements DBStore {
private static final Logger LOG =
LoggerFactory.getLogger(RDBStore.class);
private final RocksDB db;
private final File dbLocation;
private final WriteOptions writeOptions;
private final DBOptions dbOptions;
private final Hashtable<String, ColumnFamilyHandle> handleTable;
private ObjectName statMBeanName;
public RDBStore(File dbFile, DBOptions options, List<String> families)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(families.size() > 0);
handleTable = new Hashtable<>();
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>();
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
for (String family : families) {
columnFamilyDescriptors.add(
new ColumnFamilyDescriptor(family.getBytes(StandardCharsets.UTF_8),
new ColumnFamilyOptions()));
}
dbOptions = options;
dbLocation = dbFile;
// TODO: Read from the next Config.
writeOptions = new WriteOptions();
try {
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
for (int x = 0; x < columnFamilyHandles.size(); x++) {
handleTable.put(
DFSUtil.bytes2String(columnFamilyHandles.get(x).getName()),
columnFamilyHandles.get(x));
}
if (dbOptions.statistics() != null) {
Map<String, String> jmxProperties = new HashMap<>();
jmxProperties.put("dbName", dbFile.getName());
statMBeanName = MBeans.register("Ozone", "RocksDbStore", jmxProperties,
new RocksDBStoreMBean(dbOptions.statistics()));
if (statMBeanName == null) {
LOG.warn("jmx registration failed during RocksDB init, db path :{}",
dbFile.getAbsolutePath());
}
}
} catch (RocksDBException e) {
throw toIOException(
"Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("RocksDB successfully opened.");
LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
}
}
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
@Override
public void compactDB() throws IOException {
if (db != null) {
try {
db.compactRange();
} catch (RocksDBException e) {
throw toIOException("Failed to compact db", e);
}
}
}
@Override
public void close() throws IOException {
for (final ColumnFamilyHandle handle : handleTable.values()) {
handle.close();
}
if (dbOptions != null) {
dbOptions.close();
}
if (writeOptions != null) {
writeOptions.close();
}
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
statMBeanName = null;
}
if (db != null) {
db.close();
}
}
@Override
public void move(byte[] key, Table source, Table dest) throws IOException {
RDBTable sourceTable;
RDBTable destTable;
if (source instanceof RDBTable) {
sourceTable = (RDBTable) source;
} else {
LOG.error("Unexpected Table type. Expected RocksTable Store for Source.");
throw new IOException("Unexpected TableStore Type in source. Expected "
+ "RocksDBTable.");
}
if (dest instanceof RDBTable) {
destTable = (RDBTable) dest;
} else {
LOG.error("Unexpected Table type. Expected RocksTable Store for Dest.");
throw new IOException("Unexpected TableStore Type in dest. Expected "
+ "RocksDBTable.");
}
try (WriteBatch batch = new WriteBatch()) {
byte[] value = sourceTable.get(key);
batch.put(destTable.getHandle(), key, value);
batch.delete(sourceTable.getHandle(), key);
db.write(writeOptions, batch);
} catch (RocksDBException rockdbException) {
LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(key));
throw toIOException("Unable to move key: " + DFSUtil.bytes2String(key),
rockdbException);
}
}
@Override
public void move(byte[] key, byte[] value, Table source,
Table dest) throws IOException {
RDBTable sourceTable;
RDBTable destTable;
if (source instanceof RDBTable) {
sourceTable = (RDBTable) source;
} else {
LOG.error("Unexpected Table type. Expected RocksTable Store for Source.");
throw new IOException("Unexpected TableStore Type in source. Expected "
+ "RocksDBTable.");
}
if (dest instanceof RDBTable) {
destTable = (RDBTable) dest;
} else {
LOG.error("Unexpected Table type. Expected RocksTable Store for Dest.");
throw new IOException("Unexpected TableStore Type in dest. Expected "
+ "RocksDBTable.");
}
try (WriteBatch batch = new WriteBatch()) {
batch.put(destTable.getHandle(), key, value);
batch.delete(sourceTable.getHandle(), key);
db.write(writeOptions, batch);
} catch (RocksDBException rockdbException) {
LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(key));
throw toIOException("Unable to move key: " + DFSUtil.bytes2String(key),
rockdbException);
}
}
@Override
public long getEstimatedKeyCount() throws IOException {
try {
return Long.parseLong(db.getProperty("rocksdb.estimate-num-keys"));
} catch (RocksDBException e) {
throw toIOException("Unable to get the estimated count.", e);
}
}
@VisibleForTesting
protected ObjectName getStatMBeanName() {
return statMBeanName;
}
@Override
public Table getTable(String name) throws IOException {
ColumnFamilyHandle handle = handleTable.get(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
}
return new RDBTable(this.db, handle, this.writeOptions);
}
@Override
public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamilyHandle handle: handleTable.values()) {
returnList.add(new RDBTable(db, handle, writeOptions));
}
return returnList;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.rocksdb.RocksIterator;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
/**
* RocksDB store iterator.
*/
public class RDBStoreIterator implements TableIterator<KeyValue> {
private RocksIterator rocksDBIterator;
public RDBStoreIterator(RocksIterator iterator) {
this.rocksDBIterator = iterator;
rocksDBIterator.seekToFirst();
}
@Override
public void forEachRemaining(Consumer<? super KeyValue> action) {
while(hasNext()) {
action.accept(next());
}
}
@Override
public boolean hasNext() {
return rocksDBIterator.isValid();
}
@Override
public Table.KeyValue next() {
if (rocksDBIterator.isValid()) {
KeyValue value = KeyValue.create(rocksDBIterator.key(), rocksDBIterator
.value());
rocksDBIterator.next();
return value;
}
throw new NoSuchElementException("RocksDB Store has no more elements");
}
@Override
public void seekToFirst() {
rocksDBIterator.seekToFirst();
}
@Override
public void seekToLast() {
rocksDBIterator.seekToLast();
}
@Override
public KeyValue seek(byte[] key) {
rocksDBIterator.seek(key);
if (rocksDBIterator.isValid()) {
return KeyValue.create(rocksDBIterator.key(),
rocksDBIterator.value());
}
return null;
}
@Override
public void close() throws IOException {
rocksDBIterator.close();
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.hdfs.DFSUtil;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* RocksDB implementation of ozone metadata store.
*/
public class RDBTable implements Table {
private static final Logger LOG =
LoggerFactory.getLogger(RDBTable.class);
private final RocksDB db;
private final ColumnFamilyHandle handle;
private final WriteOptions writeOptions;
/**
* Constructs a TableStore.
*
* @param db - DBstore that we are using.
* @param handle - ColumnFamily Handle.
* @param writeOptions - RocksDB write Options.
*/
public RDBTable(RocksDB db, ColumnFamilyHandle handle,
WriteOptions writeOptions) {
this.db = db;
this.handle = handle;
this.writeOptions = writeOptions;
}
/**
* Converts RocksDB exception to IOE.
* @param msg - Message to add to exception.
* @param e - Original Exception.
* @return IOE.
*/
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
/**
* Returns the Column family Handle.
*
* @return ColumnFamilyHandle.
*/
@Override
public ColumnFamilyHandle getHandle() {
return handle;
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
try {
db.put(handle, writeOptions, key, value);
} catch (RocksDBException e) {
LOG.error("Failed to write to DB. Key: {}", new String(key,
StandardCharsets.UTF_8));
throw toIOException("Failed to put key-value to metadata "
+ "store", e);
}
}
@Override
public boolean isEmpty() throws IOException {
try (TableIterator<KeyValue> keyIter = iterator()) {
keyIter.seekToFirst();
return !keyIter.hasNext();
}
}
@Override
public byte[] get(byte[] key) throws IOException {
try {
return db.get(handle, key);
} catch (RocksDBException e) {
throw toIOException(
"Failed to get the value for the given key", e);
}
}
@Override
public void delete(byte[] key) throws IOException {
try {
db.delete(handle, key);
} catch (RocksDBException e) {
throw toIOException("Failed to delete the given key", e);
}
}
@Override
public void writeBatch(WriteBatch operation) throws IOException {
try {
db.write(writeOptions, operation);
} catch (RocksDBException e) {
throw toIOException("Batch write operation failed", e);
}
}
// @Override
// public void iterate(byte[] from, EntryConsumer consumer)
// throws IOException {
//
// try (RocksIterator it = db.newIterator(handle)) {
// if (from != null) {
// it.seek(from);
// } else {
// it.seekToFirst();
// }
// while (it.isValid()) {
// if (!consumer.consume(it.key(), it.value())) {
// break;
// }
// it.next();
// }
// }
// }
@Override
public TableIterator<KeyValue> iterator() {
ReadOptions readOptions = new ReadOptions();
return new RDBStoreIterator(db.newIterator(handle, readOptions));
}
@Override
public String getName() throws IOException {
try {
return DFSUtil.bytes2String(this.getHandle().getName());
} catch (RocksDBException rdbEx) {
throw toIOException("Unable to get the table name.", rdbEx);
}
}
@Override
public void close() throws Exception {
// Nothing do for a Column Family.
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.hadoop.classification.InterfaceStability;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.WriteBatch;
import java.io.IOException;
/**
* Interface for key-value store that stores ozone metadata. Ozone metadata is
* stored as key value pairs, both key and value are arbitrary byte arrays. Each
* Table Stores a certain kind of keys and values. This allows a DB to have
* different kind of tables.
*/
@InterfaceStability.Evolving
public interface Table extends AutoCloseable {
/**
* Puts a key-value pair into the store.
*
* @param key metadata key
* @param value metadata value
*/
void put(byte[] key, byte[] value) throws IOException;
/**
* @return true if the metadata store is empty.
* @throws IOException on Failure
*/
boolean isEmpty() throws IOException;
/**
* Returns the value mapped to the given key in byte array or returns null
* if the key is not found.
*
* @param key metadata key
* @return value in byte array or null if the key is not found.
* @throws IOException on Failure
*/
byte[] get(byte[] key) throws IOException;
/**
* Deletes a key from the metadata store.
*
* @param key metadata key
* @throws IOException on Failure
*/
void delete(byte[] key) throws IOException;
/**
* Return the Column Family handle. TODO: This leaks an RockDB abstraction
* into Ozone code, cleanup later.
*
* @return ColumnFamilyHandle
*/
ColumnFamilyHandle getHandle();
/**
* A batch of PUT, DELETE operations handled as a single atomic write.
*
* @throws IOException write fails
*/
void writeBatch(WriteBatch operation) throws IOException;
/**
* Returns the iterator for this metadata store.
*
* @return MetaStoreIterator
*/
TableIterator<KeyValue> iterator();
/**
* Returns the Name of this Table.
* @return - Table Name.
* @throws IOException on failure.
*/
String getName() throws IOException;
/**
* Class used to represent the key and value pair of a db entry.
*/
class KeyValue {
private final byte[] key;
private final byte[] value;
/**
* KeyValue Constructor, used to represent a key and value of a db entry.
*
* @param key - Key Bytes
* @param value - Value bytes
*/
private KeyValue(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}
/**
* Create a KeyValue pair.
*
* @param key - Key Bytes
* @param value - Value bytes
* @return KeyValue object.
*/
public static KeyValue create(byte[] key, byte[] value) {
return new KeyValue(key, value);
}
/**
* Return key.
*
* @return byte[]
*/
public byte[] getKey() {
byte[] result = new byte[key.length];
System.arraycopy(key, 0, result, 0, key.length);
return result;
}
/**
* Return value.
*
* @return byte[]
*/
public byte[] getValue() {
byte[] result = new byte[value.length];
System.arraycopy(value, 0, result, 0, value.length);
return result;
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import java.io.Closeable;
import java.util.Iterator;
/**
* Iterator for MetaDataStore DB.
*
* @param <T>
*/
public interface TableIterator<T> extends Iterator<T>, Closeable {
/**
* seek to first entry.
*/
void seekToFirst();
/**
* seek to last entry.
*/
void seekToLast();
/**
* Seek to the specific key.
*
* @param key - Bytes that represent the key.
* @return T.
*/
T seek(byte[] key);
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Database interfaces for Ozone.
*/
package org.apache.hadoop.utils.db;

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import javax.management.MBeanServer;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* RDBStore Tests.
*/
public class TestRDBStore {
private final List<String> families =
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth");
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Rule
public ExpectedException thrown = ExpectedException.none();
private RDBStore rdbStore = null;
private DBOptions options = null;
@Before
public void setUp() throws Exception {
options = new DBOptions();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);
Statistics statistics = new Statistics();
statistics.setStatsLevel(StatsLevel.ALL);
options = options.setStatistics(statistics);
rdbStore = new RDBStore(folder.newFolder(), options, families);
}
@After
public void tearDown() throws Exception {
if (rdbStore != null) {
rdbStore.close();
}
}
@Test
public void compactDB() throws Exception {
try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, families)) {
Assert.assertNotNull("DB Store cannot be null", newStore);
try (Table firstTable = newStore.getTable(families.get(1))) {
Assert.assertNotNull("Table cannot be null", firstTable);
for (int x = 0; x < 100; x++) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
firstTable.put(key, value);
}
}
// This test does not assert anything if there is any error this test
// will throw and fail.
newStore.compactDB();
}
}
@Test
public void close() throws Exception {
RDBStore newStore =
new RDBStore(folder.newFolder(), options, families);
Assert.assertNotNull("DBStore cannot be null", newStore);
// This test does not assert anything if there is any error this test
// will throw and fail.
newStore.close();
}
@Test
public void moveKey() throws Exception {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
try (Table firstTable = rdbStore.getTable(families.get(1))) {
firstTable.put(key, value);
try (Table secondTable = rdbStore.getTable(families.get(2))) {
rdbStore.move(key, firstTable, secondTable);
byte[] newvalue = secondTable.get(key);
// Make sure we have value in the second table
Assert.assertNotNull(newvalue);
//and it is same as what we wrote to the FirstTable
Assert.assertArrayEquals(value, newvalue);
}
// After move this key must not exist in the first table.
Assert.assertNull(firstTable.get(key));
}
}
@Test
public void moveWithValue() throws Exception {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] nextValue =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
try (Table firstTable = rdbStore.getTable(families.get(1))) {
firstTable.put(key, value);
try (Table secondTable = rdbStore.getTable(families.get(2))) {
rdbStore.move(key, nextValue, firstTable, secondTable);
byte[] newvalue = secondTable.get(key);
// Make sure we have value in the second table
Assert.assertNotNull(newvalue);
//and it is not same as what we wrote to the FirstTable, and equals
// the new value.
Assert.assertArrayEquals(nextValue, nextValue);
}
}
}
@Test
public void getEstimatedKeyCount() throws Exception {
try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, families)) {
Assert.assertNotNull("DB Store cannot be null", newStore);
// Write 100 keys to the first table.
try (Table firstTable = newStore.getTable(families.get(1))) {
Assert.assertNotNull("Table cannot be null", firstTable);
for (int x = 0; x < 100; x++) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
firstTable.put(key, value);
}
}
// Write 100 keys to the secondTable table.
try (Table secondTable = newStore.getTable(families.get(2))) {
Assert.assertNotNull("Table cannot be null", secondTable);
for (int x = 0; x < 100; x++) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
secondTable.put(key, value);
}
}
// Let us make sure that our estimate is not off by 10%
Assert.assertTrue(newStore.getEstimatedKeyCount() > 180
|| newStore.getEstimatedKeyCount() < 220);
}
}
@Test
public void getStatMBeanName() throws Exception {
try (Table firstTable = rdbStore.getTable(families.get(1))) {
for (int y = 0; y < 100; y++) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
firstTable.put(key, value);
}
}
MBeanServer platformMBeanServer =
ManagementFactory.getPlatformMBeanServer();
Thread.sleep(2000);
Object keysWritten = platformMBeanServer
.getAttribute(rdbStore.getStatMBeanName(), "NUMBER_KEYS_WRITTEN");
Assert.assertTrue(((Long) keysWritten) >= 99L);
Object dbWriteAverage = platformMBeanServer
.getAttribute(rdbStore.getStatMBeanName(), "DB_WRITE_AVERAGE");
Assert.assertTrue((double) dbWriteAverage > 0);
}
@Test
public void getTable() throws Exception {
for (String tableName : families) {
try (Table table = rdbStore.getTable(tableName)) {
Assert.assertNotNull(tableName + "is null", table);
}
}
thrown.expect(IOException.class);
rdbStore.getTable("ATableWithNoName");
}
@Test
public void listTables() throws Exception {
List<Table> tableList = rdbStore.listTables();
Assert.assertNotNull("Table list cannot be null", tableList);
Map<String, Table> hashTable = new HashMap<>();
for (Table t : tableList) {
hashTable.put(t.getName(), t);
}
int count = families.size();
// Assert that we have all the tables in the list and no more.
for (String name : families) {
Assert.assertTrue(hashTable.containsKey(name));
count--;
}
Assert.assertEquals(0, count);
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.WriteBatch;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* Tests for RocksDBTable Store.
*/
public class TestRDBTableStore {
private static int count = 0;
private final List<String> families =
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth");
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private RDBStore rdbStore = null;
private DBOptions options = null;
@Before
public void setUp() throws Exception {
options = new DBOptions();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);
Statistics statistics = new Statistics();
statistics.setStatsLevel(StatsLevel.ALL);
options = options.setStatistics(statistics);
rdbStore = new RDBStore(folder.newFolder(), options, families);
}
@After
public void tearDown() throws Exception {
if (rdbStore != null) {
rdbStore.close();
}
}
@Test
public void toIOException() {
}
@Test
public void getHandle() throws Exception {
try (Table testTable = rdbStore.getTable("First")) {
Assert.assertNotNull(testTable);
Assert.assertNotNull(testTable.getHandle());
}
}
@Test
public void putGetAndEmpty() throws Exception {
try (Table testTable = rdbStore.getTable("First")) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
testTable.put(key, value);
Assert.assertFalse(testTable.isEmpty());
byte[] readValue = testTable.get(key);
Assert.assertArrayEquals(value, readValue);
}
try (Table secondTable = rdbStore.getTable("Second")) {
Assert.assertTrue(secondTable.isEmpty());
}
}
@Test
public void delete() throws Exception {
List<byte[]> deletedKeys = new LinkedList<>();
List<byte[]> validKeys = new LinkedList<>();
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
for (int x = 0; x < 100; x++) {
deletedKeys.add(
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8));
}
for (int x = 0; x < 100; x++) {
validKeys.add(
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8));
}
// Write all the keys and delete the keys scheduled for delete.
//Assert we find only expected keys in the Table.
try (Table testTable = rdbStore.getTable("Fourth")) {
for (int x = 0; x < deletedKeys.size(); x++) {
testTable.put(deletedKeys.get(x), value);
testTable.delete(deletedKeys.get(x));
}
for (int x = 0; x < validKeys.size(); x++) {
testTable.put(validKeys.get(x), value);
}
for (int x = 0; x < validKeys.size(); x++) {
Assert.assertNotNull(testTable.get(validKeys.get(0)));
}
for (int x = 0; x < deletedKeys.size(); x++) {
Assert.assertNull(testTable.get(deletedKeys.get(0)));
}
}
}
@Test
public void writeBatch() throws Exception {
WriteBatch batch = new WriteBatch();
try (Table testTable = rdbStore.getTable("Fifth")) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
batch.put(testTable.getHandle(), key, value);
testTable.writeBatch(batch);
Assert.assertNotNull(testTable.get(key));
}
batch.close();
}
private static boolean consume(Table.KeyValue keyValue) {
count++;
Assert.assertNotNull(keyValue.getKey());
return true;
}
@Test
public void forEachAndIterator() throws Exception {
final int iterCount = 100;
try (Table testTable = rdbStore.getTable("Sixth")) {
for (int x = 0; x < iterCount; x++) {
byte[] key =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
byte[] value =
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
testTable.put(key, value);
}
int localCount = 0;
try (TableIterator<Table.KeyValue> iter = testTable.iterator()) {
while (iter.hasNext()) {
Table.KeyValue keyValue = iter.next();
localCount++;
}
Assert.assertEquals(iterCount, localCount);
iter.seekToFirst();
iter.forEachRemaining(TestRDBTableStore::consume);
Assert.assertEquals(iterCount, count);
}
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Tests for the DB Utilities.
*/
package org.apache.hadoop.utils.db;