From 91ffbaa8b9a3042a225d65c61a81bf41c53d5e33 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 24 Aug 2017 13:46:03 +0800 Subject: [PATCH] HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu. --- .../org/apache/hadoop/ozone/OzoneConsts.java | 1 + .../org/apache/hadoop/scm/ScmConfigKeys.java | 3 + .../ozone/scm/block/BlockManagerImpl.java | 6 +- .../ozone/scm/block/DeletedBlockLog.java | 78 ++++++ .../ozone/scm/block/DeletedBlockLogImpl.java | 246 ++++++++++++++++++ .../StorageContainerDatanodeProtocol.proto | 9 + .../src/main/resources/ozone-default.xml | 13 + .../ozone/scm/block/TestDeletedBlockLog.java | 240 +++++++++++++++++ 8 files changed, 595 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 68f1e097fdb..de8061afbe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -80,6 +80,7 @@ public final class OzoneConsts { public static final String BLOCK_DB = "block.db"; public static final String NODEPOOL_DB = "nodepool.db"; public static final String OPEN_CONTAINERS_DB = "openContainers.db"; + public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String KSM_DB_NAME = "ksm.db"; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 0b081a11722..44cc3803ece 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -212,6 +212,9 @@ public final class ScmConfigKeys { public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT = 300; // Default 5 minute wait. + public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = + "ozone.scm.block.deletion.max.retry"; + public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096; /** * Never constructed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index 57305896f82..43ca21cc58e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -88,6 +88,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // Track all containers owned by block service. private final MetadataStore containerStore; + private final DeletedBlockLog deletedBlockLog; private Map> containers; @@ -142,6 +143,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { this.lock = new ReentrantLock(); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); + deletedBlockLog = new DeletedBlockLogImpl(conf); } // TODO: close full (or almost full) containers with a separate thread. @@ -490,7 +492,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { if (containerStore != null) { containerStore.close(); } - + if (deletedBlockLog != null) { + deletedBlockLog.close(); + } MBeans.unregister(mxBean); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java new file mode 100644 index 00000000000..60d53af45d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.block; + + +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * The DeletedBlockLog is a persisted log in SCM to keep tracking + * container blocks which are under deletion. It maintains info + * about under-deletion container blocks that notified by KSM, + * and the state how it is processed. + */ +public interface DeletedBlockLog extends Closeable { + + /** + * A limit size list of transactions. Note count is the max number + * of TXs to return, we might not be able to always return this + * number. and the processCount of those transactions + * should be [0, MAX_RETRY). + * + * @param count - number of transactions. + * @return a list of BlockDeletionTransaction. + */ + List getTransactions(int count) + throws IOException; + + /** + * Increments count for given list of transactions by 1. + * The log maintains a valid range of counts for each transaction + * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate + * the transaction is no longer valid. + * + * @param txIDs - transaction ID. + */ + void incrementCount(List txIDs) + throws IOException; + + /** + * Commits a transaction means to delete all footprints of a transaction + * from the log. This method doesn't guarantee all transactions can be + * successfully deleted, it tolerate failures and tries best efforts to. + * + * @param txIDs - transaction IDs. + */ + void commitTransactions(List txIDs) throws IOException; + + /** + * Creates a block deletion transaction and adds that into the log. + * + * @param containerName - container name. + * @param blocks - blocks that belong to the same container. + * + * @throws IOException + */ + void addTransaction(String containerName, List blocks) + throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java new file mode 100644 index 00000000000..ef1a515c8f8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; + +/** + * A implement class of {@link DeletedBlockLog}, and it uses + * K/V db to maintain block deletion transactions between scm and datanode. + * This is a very basic implementation, it simply scans the log and + * memorize the position that scanned by last time, and uses this to + * determine where the next scan starts. It has no notion about weight + * of each transaction so as long as transaction is still valid, they get + * equally same chance to be retrieved which only depends on the nature + * order of the transaction ID. + */ +public class DeletedBlockLogImpl implements DeletedBlockLog { + + private static final Logger LOG = + LoggerFactory.getLogger(DeletedBlockLogImpl.class); + + private static final byte[] LATEST_TXID = + DFSUtil.string2Bytes("#LATEST_TXID#"); + + private final int maxRetry; + private final MetadataStore deletedStore; + private final Lock lock; + // The latest id of deleted blocks in the db. + private long lastTxID; + private long lastReadTxID; + + public DeletedBlockLogImpl(Configuration conf) throws IOException { + maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, + OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); + + File metaDir = OzoneUtils.getScmMetadirPath(conf); + String scmMetaDataDir = metaDir.getPath(); + File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB); + int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + // Load store of all transactions. + deletedStore = MetadataStoreBuilder.newBuilder() + .setCreateIfMissing(true) + .setConf(conf) + .setDbFile(deletedLogDbPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + + this.lock = new ReentrantLock(); + // start from the head of deleted store. + lastReadTxID = 0; + lastTxID = findLatestTxIDInStore(); + } + + @VisibleForTesting + MetadataStore getDeletedStore() { + return deletedStore; + } + + /** + * There is no need to lock before reading because + * it's only used in construct method. + * + * @return latest txid. + * @throws IOException + */ + private long findLatestTxIDInStore() throws IOException { + long txid = 0; + byte[] value = deletedStore.get(LATEST_TXID); + if (value != null) { + txid = Longs.fromByteArray(value); + } + return txid; + } + + @Override + public List getTransactions( + int count) throws IOException { + List result = new ArrayList<>(); + MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey) + -> Longs.fromByteArray(currentKey) > lastReadTxID; + MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey) + -> !Arrays.equals(LATEST_TXID, currentKey); + lock.lock(); + try { + deletedStore.iterate(null, (key, value) -> { + if (getNextTxID.filterKey(null, key, null) && + avoidInvalidTxid.filterKey(null, key, null)) { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(value); + if (block.getCount() > -1 && block.getCount() <= maxRetry) { + result.add(block); + } + } + return result.size() < count; + }); + // Scan the metadata from the beginning. + if (result.size() < count || result.size() < 1) { + lastReadTxID = 0; + } else { + lastReadTxID = result.get(result.size() - 1).getTxID(); + } + } finally { + lock.unlock(); + } + return result; + } + + /** + * {@inheritDoc} + * + * @param txIDs - transaction ID. + * @throws IOException + */ + @Override + public void incrementCount(List txIDs) throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + for(Long txID : txIDs) { + try { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(deletedStore.get(Longs.toByteArray(txID))); + DeletedBlocksTransaction.Builder builder = block.toBuilder(); + if (block.getCount() > -1) { + builder.setCount(block.getCount() + 1); + } + // if the retry time exceeds the maxRetry value + // then set the retry value to -1, stop retrying, admins can + // analyze those blocks and purge them manually by SCMCli. + if (block.getCount() > maxRetry) { + builder.setCount(-1); + } + deletedStore.put(Longs.toByteArray(txID), + builder.build().toByteArray()); + } catch (IOException ex) { + LOG.warn("Cannot increase count for txID " + txID, ex); + } + } + deletedStore.writeBatch(batch); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param txIDs - transaction IDs. + * @throws IOException + */ + @Override + public void commitTransactions(List txIDs) throws IOException { + lock.lock(); + try { + for (Long txID : txIDs) { + try { + deletedStore.delete(Longs.toByteArray(txID)); + } catch (IOException ex) { + LOG.warn("Cannot commit txID " + txID, ex); + } + } + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param containerName - container name. + * @param blocks - blocks that belong to the same container. + * @throws IOException + */ + @Override + public void addTransaction(String containerName, List blocks) + throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder() + .setTxID(lastTxID + 1) + .setContainerName(containerName) + .addAllBlockID(blocks) + .setCount(0) + .build(); + byte[] key = Longs.toByteArray(lastTxID + 1); + + batch.put(key, tx.toByteArray()); + batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1)); + + deletedStore.writeBatch(batch); + lastTxID += 1; + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + if (deletedStore != null) { + deletedStore.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 8400ee042a1..a8cfa57e8ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -99,6 +99,15 @@ message ContainerInfo { optional int64 keycount = 4; } +// The deleted blocks which are stored in deletedBlock.db of scm. +message DeletedBlocksTransaction { + required int64 txID = 1; + required string containerName = 2; + repeated string blockID = 3; + // the retry time of sending deleting command to datanode. + required int32 count = 4; +} + /** A set of container reports, max count is generally set to 8192 since that keeps the size of the reports under 1 MB. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index e39a5ec97b6..b10a536e805 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -264,6 +264,19 @@ + + ozone.scm.block.deletion.max.retry + 4096 + + SCM wraps up a number of blocks in a deletion transaction and send that + to datanode for physically deletion periodically. This property + determines how many times at most for SCM to retry sending a deletion + transaction to datanode. The default value 4096 is relatively big so + that SCM could try enough times before giving up, as the actual deletion + is async so time required is unpredictable. + + + ozone.scm.heartbeat.log.warn.interval.count 10 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java new file mode 100644 index 00000000000..c1c87ab02c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.block; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; + +/** + * Tests for DeletedBlockLog. + */ +public class TestDeletedBlockLog { + + private static DeletedBlockLogImpl deletedBlockLog; + private OzoneConfiguration conf; + private File testDir; + + @Before + public void setup() throws Exception { + testDir = GenericTestUtils.getTestDir( + TestDeletedBlockLog.class.getSimpleName()); + conf = new OzoneConfiguration(); + conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); + conf.set(OZONE_CONTAINER_METADATA_DIRS, testDir.getAbsolutePath()); + deletedBlockLog = new DeletedBlockLogImpl(conf); + } + + @After + public void tearDown() throws Exception { + deletedBlockLog.close(); + FileUtils.deleteDirectory(testDir); + } + + private Map> generateData(int dataSize) { + Map> blockMap = new HashMap<>(); + Random random = new Random(1); + for (int i = 0; i < dataSize; i++) { + String containerName = "container-" + UUID.randomUUID().toString(); + List blocks = new ArrayList<>(); + int blockSize = random.nextInt(30) + 1; + for (int j = 0; j < blockSize; j++) { + blocks.add("block-" + UUID.randomUUID().toString()); + } + blockMap.put(containerName, blocks); + } + return blockMap; + } + + @Test + public void testGetTransactions() throws Exception { + List blocks = + deletedBlockLog.getTransactions(30); + Assert.assertEquals(0, blocks.size()); + + // Creates 40 TX in the log. + for (Map.Entry> entry : generateData(40).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + + // Get first 30 TXs. + blocks = deletedBlockLog.getTransactions(30); + Assert.assertEquals(30, blocks.size()); + for (int i = 0; i < 30; i++) { + Assert.assertEquals(i + 1, blocks.get(i).getTxID()); + } + + // Get another 30 TXs. + // The log only 10 left, so this time it will only return 10 TXs. + blocks = deletedBlockLog.getTransactions(30); + Assert.assertEquals(10, blocks.size()); + for (int i = 30; i < 40; i++) { + Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID()); + } + + // Get another 50 TXs. + // By now the position should have moved to the beginning, + // this call will return all 40 TXs. + blocks = deletedBlockLog.getTransactions(50); + Assert.assertEquals(40, blocks.size()); + for (int i = 0; i < 40; i++) { + Assert.assertEquals(i + 1, blocks.get(i).getTxID()); + } + List txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.commitTransactions(txIDs); + } + + @Test + public void testIncrementCount() throws Exception { + int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); + + // Create 30 TXs in the log. + for (Map.Entry> entry : generateData(30).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + + // This will return all TXs, total num 30. + List blocks = + deletedBlockLog.getTransactions(40); + List txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID) + .collect(Collectors.toList()); + + for (int i = 0; i < maxRetry; i++) { + deletedBlockLog.incrementCount(txIDs); + } + + // Increment another time so it exceed the maxRetry. + // On this call, count will be set to -1 which means TX eventually fails. + deletedBlockLog.incrementCount(txIDs); + blocks = deletedBlockLog.getTransactions(40); + for (DeletedBlocksTransaction block : blocks) { + Assert.assertEquals(-1, block.getCount()); + } + + // If all TXs are failed, getTransactions call will always return nothing. + blocks = deletedBlockLog.getTransactions(40); + Assert.assertEquals(blocks.size(), 0); + } + + @Test + public void testCommitTransactions() throws Exception { + for (Map.Entry> entry : generateData(50).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + List blocks = + deletedBlockLog.getTransactions(20); + List txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + // Add an invalid txID. + txIDs.add(70L); + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(50); + Assert.assertEquals(30, blocks.size()); + } + + @Test + public void testRandomOperateTransactions() throws Exception { + Random random = new Random(); + int added = 0, committed = 0; + List blocks = new ArrayList<>(); + List txIDs = new ArrayList<>(); + byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#"); + MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid = + (preKey, currentKey, nextKey) -> + !Arrays.equals(latestTxid, currentKey); + MetadataStore store = deletedBlockLog.getDeletedStore(); + // Randomly add/get/commit/increase transactions. + for (int i = 0; i < 100; i++) { + int state = random.nextInt(4); + if (state == 0) { + for (Map.Entry> entry : + generateData(10).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + added += 10; + } else if (state == 1) { + blocks = deletedBlockLog.getTransactions(20); + txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.incrementCount(txIDs); + } else if (state == 2) { + txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + blocks = new ArrayList<>(); + committed += txIDs.size(); + deletedBlockLog.commitTransactions(txIDs); + } else { + // verify the number of added and committed. + List> result = + store.getRangeKVs(null, added, avoidLatestTxid); + Assert.assertEquals(added, result.size() + committed); + } + } + } + + @Test + public void testPersistence() throws Exception { + for (Map.Entry> entry : generateData(50).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + // close db and reopen it again to make sure + // transactions are stored persistently. + deletedBlockLog.close(); + deletedBlockLog = new DeletedBlockLogImpl(conf); + List blocks = + deletedBlockLog.getTransactions(10); + List txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(10); + Assert.assertEquals(10, blocks.size()); + } +}