diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 56166ab9ffc..9e0c4a4b42c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -44,6 +44,7 @@ public interface DBStore extends AutoCloseable { */ Table getTable(String name) throws IOException; + /** * Gets an existing TableStore with implicit key/value conversion. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java index 88b0411d3e1..7bbe9d91b17 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java @@ -22,6 +22,7 @@ package org.apache.hadoop.utils.db; import java.io.IOException; import java.nio.charset.StandardCharsets; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtil; import org.rocksdb.ColumnFamilyHandle; @@ -33,9 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * RocksDB implementation of ozone metadata store. + * RocksDB implementation of ozone metadata store. This class should be only + * used as part of TypedTable as it's underlying implementation to access the + * metadata store content. All other user's using Table should use TypedTable. */ -public class RDBTable implements Table { +@InterfaceAudience.Private +class RDBTable implements Table { private static final Logger LOG = @@ -52,7 +56,7 @@ public class RDBTable implements Table { * @param handle - ColumnFamily Handle. * @param writeOptions - RocksDB write Options. */ - public RDBTable(RocksDB db, ColumnFamilyHandle handle, + RDBTable(RocksDB db, ColumnFamilyHandle handle, WriteOptions writeOptions) { this.db = db; this.handle = handle; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java index 2f14e778ec1..905a68b0646 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java @@ -21,8 +21,10 @@ package org.apache.hadoop.utils.db; import java.io.IOException; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.classification.InterfaceStability; - +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; /** * Interface for key-value store that stores ozone metadata. Ozone metadata is * stored as key value pairs, both key and value are arbitrary byte arrays. Each @@ -97,6 +99,28 @@ public interface Table extends AutoCloseable { */ String getName() throws IOException; + /** + * Add entry to the table cache. + * + * If the cacheKey already exists, it will override the entry. + * @param cacheKey + * @param cacheValue + */ + default void addCacheEntry(CacheKey cacheKey, + CacheValue cacheValue) { + throw new NotImplementedException("addCacheEntry is not implemented"); + } + + /** + * Removes all the entries from the table cache which are having epoch value + * less + * than or equal to specified epoch value. + * @param epoch + */ + default void cleanupCache(long epoch) { + throw new NotImplementedException("cleanupCache is not implemented"); + } + /** * Class used to represent the key and value pair of a db entry. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java index 667822b91d3..6de65090a92 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java @@ -20,6 +20,12 @@ package org.apache.hadoop.utils.db; import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; +import org.apache.hadoop.utils.db.cache.PartialTableCache; +import org.apache.hadoop.utils.db.cache.TableCache; + /** * Strongly typed table implementation. *

@@ -31,13 +37,16 @@ import java.io.IOException; */ public class TypedTable implements Table { - private Table rawTable; + private final Table rawTable; - private CodecRegistry codecRegistry; + private final CodecRegistry codecRegistry; - private Class keyType; + private final Class keyType; + + private final Class valueType; + + private final TableCache, CacheValue> cache; - private Class valueType; public TypedTable( Table rawTable, @@ -47,6 +56,7 @@ public class TypedTable implements Table { this.codecRegistry = codecRegistry; this.keyType = keyType; this.valueType = valueType; + cache = new PartialTableCache<>(); } @Override @@ -69,8 +79,34 @@ public class TypedTable implements Table { return rawTable.isEmpty(); } + /** + * Returns the value mapped to the given key in byte array or returns null + * if the key is not found. + * + * Caller's of this method should use synchronization mechanism, when + * accessing. First it will check from cache, if it has entry return the + * value, otherwise get from the RocksDB table. + * + * @param key metadata key + * @return VALUE + * @throws IOException + */ @Override public VALUE get(KEY key) throws IOException { + // Here the metadata lock will guarantee that cache is not updated for same + // key during get key. + CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key)); + if (cacheValue == null) { + // If no cache for the table or if it does not exist in cache get from + // RocksDB table. + return getFromTable(key); + } else { + // We have a value in cache, return the value. + return cacheValue.getValue(); + } + } + + private VALUE getFromTable(KEY key) throws IOException { byte[] keyBytes = codecRegistry.asRawData(key); byte[] valueBytes = rawTable.get(keyBytes); return codecRegistry.asObject(valueBytes, valueType); @@ -106,6 +142,40 @@ public class TypedTable implements Table { } + @Override + public void addCacheEntry(CacheKey cacheKey, + CacheValue cacheValue) { + // This will override the entry if there is already entry for this key. + cache.put(cacheKey, cacheValue); + } + + + @Override + public void cleanupCache(long epoch) { + cache.cleanup(epoch); + } + + @VisibleForTesting + TableCache, CacheValue> getCache() { + return cache; + } + + public Table getRawTable() { + return rawTable; + } + + public CodecRegistry getCodecRegistry() { + return codecRegistry; + } + + public Class getKeyType() { + return keyType; + } + + public Class getValueType() { + return valueType; + } + /** * Key value implementation for strongly typed tables. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java new file mode 100644 index 00000000000..f928e4775a5 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils.db.cache; + +import java.util.Objects; + +/** + * CacheKey for the RocksDB table. + * @param + */ +public class CacheKey { + + private final KEY key; + + public CacheKey(KEY key) { + Objects.requireNonNull(key, "Key Should not be null in CacheKey"); + this.key = key; + } + + public KEY getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(key, cacheKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java new file mode 100644 index 00000000000..34f77ae1752 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils.db.cache; + +import com.google.common.base.Optional; + +/** + * CacheValue for the RocksDB Table. + * @param + */ +public class CacheValue { + + private Optional value; + // This value is used for evict entries from cache. + // This value is set with ratis transaction context log entry index. + private long epoch; + + public CacheValue(Optional value, long epoch) { + this.value = value; + this.epoch = epoch; + } + + public VALUE getValue() { + return value.orNull(); + } + + public long getEpoch() { + return epoch; + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java new file mode 100644 index 00000000000..6966b3d92d6 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db.cache; + +import java.util.Objects; + +/** + * Class used which describes epoch entry. This will be used during deletion + * entries from cache for partial table cache. + * @param + */ +public class EpochEntry implements Comparable { + + private long epoch; + private CACHEKEY cachekey; + + EpochEntry(long epoch, CACHEKEY cachekey) { + this.epoch = epoch; + this.cachekey = cachekey; + } + + public long getEpoch() { + return epoch; + } + + public CACHEKEY getCachekey() { + return cachekey; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EpochEntry that = (EpochEntry) o; + return epoch == that.epoch && cachekey == that.cachekey; + } + + @Override + public int hashCode() { + return Objects.hash(epoch, cachekey); + } + + public int compareTo(Object o) { + if(this.epoch == ((EpochEntry)o).epoch) { + return 0; + } else if (this.epoch < ((EpochEntry)o).epoch) { + return -1; + } else { + return 1; + } + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java new file mode 100644 index 00000000000..4d3711269a1 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db.cache; + +import java.util.Iterator; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Cache implementation for the table, this cache is partial cache, this will + * be cleaned up, after entries are flushed to DB. + */ +@Private +@Evolving +public class PartialTableCache implements TableCache { + + private final ConcurrentHashMap cache; + private final TreeSet> epochEntries; + private ExecutorService executorService; + + + + public PartialTableCache() { + cache = new ConcurrentHashMap<>(); + epochEntries = new TreeSet<>(); + // Created a singleThreadExecutor, so one cleanup will be running at a + // time. + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("PartialTableCache Cleanup Thread - %d").build(); + executorService = Executors.newSingleThreadExecutor(build); + + } + + @Override + public CACHEVALUE get(CACHEKEY cachekey) { + return cache.get(cachekey); + } + + @Override + public void put(CACHEKEY cacheKey, CACHEVALUE value) { + cache.put(cacheKey, value); + epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey)); + } + + @Override + public void cleanup(long epoch) { + executorService.submit(() -> evictCache(epoch)); + } + + @Override + public int size() { + return cache.size(); + } + + private void evictCache(long epoch) { + EpochEntry currentEntry = null; + for (Iterator> iterator = epochEntries.iterator(); + iterator.hasNext();) { + currentEntry = iterator.next(); + CACHEKEY cachekey = currentEntry.getCachekey(); + CacheValue cacheValue = cache.get(cachekey); + if (cacheValue.getEpoch() <= epoch) { + cache.remove(cachekey); + iterator.remove(); + } else { + // If currentEntry epoch is greater than epoch, we have deleted all + // entries less than specified epoch. So, we can break. + break; + } + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java new file mode 100644 index 00000000000..70e0b33e929 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db.cache; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Cache used for RocksDB tables. + * @param + * @param + */ + +@Private +@Evolving +public interface TableCache { + + /** + * Return the value for the key if it is present, otherwise return null. + * @param cacheKey + * @return CACHEVALUE + */ + CACHEVALUE get(CACHEKEY cacheKey); + + /** + * Add an entry to the cache, if the key already exists it overrides. + * @param cacheKey + * @param value + */ + void put(CACHEKEY cacheKey, CACHEVALUE value); + + /** + * Removes all the entries from the cache which are having epoch value less + * than or equal to specified epoch value. For FullTable Cache this is a + * do-nothing operation. + * @param epoch + */ + void cleanup(long epoch); + + /** + * Return the size of the cache. + * @return size + */ + int size(); +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java new file mode 100644 index 00000000000..8d2506a9bfe --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils.db.cache; diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java index 4d3b1bf79c8..adedcaf52c4 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java @@ -26,10 +26,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; +import com.google.common.base.Optional; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.utils.db.Table.KeyValue; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,7 +55,7 @@ public class TestTypedRDBTableStore { Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), "First", "Second", "Third", "Fourth", "Fifth", - "Sixth"); + "Sixth", "Seven"); @Rule public TemporaryFolder folder = new TemporaryFolder(); private RDBStore rdbStore = null; @@ -236,4 +240,80 @@ public class TestTypedRDBTableStore { } } } + + @Test + public void testTypedTableWithCache() throws Exception { + int iterCount = 10; + try (Table testTable = createTypedTable( + "Seven")) { + + for (int x = 0; x < iterCount; x++) { + String key = Integer.toString(x); + String value = Integer.toString(x); + testTable.addCacheEntry(new CacheKey<>(key), + new CacheValue<>(Optional.of(value), + x)); + } + + // As we have added to cache, so get should return value even if it + // does not exist in DB. + for (int x = 0; x < iterCount; x++) { + Assert.assertEquals(Integer.toString(1), + testTable.get(Integer.toString(1))); + } + + } + } + + @Test + public void testTypedTableWithCacheWithFewDeletedOperationType() + throws Exception { + int iterCount = 10; + try (Table testTable = createTypedTable( + "Seven")) { + + for (int x = 0; x < iterCount; x++) { + String key = Integer.toString(x); + String value = Integer.toString(x); + if (x % 2 == 0) { + testTable.addCacheEntry(new CacheKey<>(key), + new CacheValue<>(Optional.of(value), x)); + } else { + testTable.addCacheEntry(new CacheKey<>(key), + new CacheValue<>(Optional.absent(), + x)); + } + } + + // As we have added to cache, so get should return value even if it + // does not exist in DB. + for (int x = 0; x < iterCount; x++) { + if (x % 2 == 0) { + Assert.assertEquals(Integer.toString(x), + testTable.get(Integer.toString(x))); + } else { + Assert.assertNull(testTable.get(Integer.toString(x))); + } + } + + testTable.cleanupCache(5); + + GenericTestUtils.waitFor(() -> + ((TypedTable) testTable).getCache().size() == 4, + 100, 5000); + + + //Check remaining values + for (int x = 6; x < iterCount; x++) { + if (x % 2 == 0) { + Assert.assertEquals(Integer.toString(x), + testTable.get(Integer.toString(x))); + } else { + Assert.assertNull(testTable.get(Integer.toString(x))); + } + } + + + } + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java new file mode 100644 index 00000000000..f70665960e2 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db.cache; + +import java.util.concurrent.CompletableFuture; + +import com.google.common.base.Optional; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.fail; + +/** + * Class tests partial table cache. + */ +public class TestPartialTableCache { + private TableCache, CacheValue> tableCache; + + @Before + public void create() { + tableCache = new PartialTableCache<>(); + } + @Test + public void testPartialTableCache() { + + + for (int i = 0; i< 10; i++) { + tableCache.put(new CacheKey<>(Integer.toString(i)), + new CacheValue<>(Optional.of(Integer.toString(i)), i)); + } + + + for (int i=0; i < 10; i++) { + Assert.assertEquals(Integer.toString(i), + tableCache.get(new CacheKey<>(Integer.toString(i))).getValue()); + } + + // On a full table cache if some one calls cleanup it is a no-op. + tableCache.cleanup(4); + + for (int i=5; i < 10; i++) { + Assert.assertEquals(Integer.toString(i), + tableCache.get(new CacheKey<>(Integer.toString(i))).getValue()); + } + } + + + @Test + public void testPartialTableCacheParallel() throws Exception { + + int totalCount = 0; + CompletableFuture future = + CompletableFuture.supplyAsync(() -> { + try { + return writeToCache(10, 1, 0); + } catch (InterruptedException ex) { + fail("writeToCache got interrupt exception"); + } + return 0; + }); + int value = future.get(); + Assert.assertEquals(10, value); + + totalCount += value; + + future = + CompletableFuture.supplyAsync(() -> { + try { + return writeToCache(10, 11, 100); + } catch (InterruptedException ex) { + fail("writeToCache got interrupt exception"); + } + return 0; + }); + + // Check we have first 10 entries in cache. + for (int i=1; i <= 10; i++) { + Assert.assertEquals(Integer.toString(i), + tableCache.get(new CacheKey<>(Integer.toString(i))).getValue()); + } + + int deleted = 5; + // cleanup first 5 entires + tableCache.cleanup(deleted); + + value = future.get(); + Assert.assertEquals(10, value); + + totalCount += value; + + // We should totalCount - deleted entries in cache. + final int tc = totalCount; + GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100, + 5000); + + // Check if we have remaining entries. + for (int i=6; i <= totalCount; i++) { + Assert.assertEquals(Integer.toString(i), + tableCache.get(new CacheKey<>(Integer.toString(i))).getValue()); + } + + tableCache.cleanup(10); + + tableCache.cleanup(totalCount); + + // Cleaned up all entries, so cache size should be zero. + GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100, + 5000); + } + + private int writeToCache(int count, int startVal, long sleep) + throws InterruptedException { + int counter = 1; + while (counter <= count){ + tableCache.put(new CacheKey<>(Integer.toString(startVal)), + new CacheValue<>(Optional.of(Integer.toString(startVal)), startVal)); + startVal++; + counter++; + Thread.sleep(sleep); + } + return count; + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java new file mode 100644 index 00000000000..b46cf614e8a --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * Tests for the DB Cache Utilities. + */ +package org.apache.hadoop.utils.db.cache; \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 793af665db6..6987927b173 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -59,6 +59,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRE import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,14 +248,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager { userTable = this.store.getTable(USER_TABLE, String.class, VolumeList.class); checkTableStatus(userTable, USER_TABLE); - this.store.getTable(VOLUME_TABLE, String.class, - String.class); volumeTable = this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class); checkTableStatus(volumeTable, VOLUME_TABLE); bucketTable = this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class); + checkTableStatus(bucketTable, BUCKET_TABLE); keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);