diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index aa63fb48f4d..966452e105b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -48,7 +48,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,52 +198,54 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } int newDeletionBlocks = 0; - MetadataStore containerDB = BlockUtils.getDB(containerData, conf); - for (Long blk : delTX.getLocalIDList()) { - BatchOperation batch = new BatchOperation(); - byte[] blkBytes = Longs.toByteArray(blk); - byte[] blkInfo = containerDB.get(blkBytes); - if (blkInfo != null) { - byte[] deletingKeyBytes = - DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk); - byte[] deletedKeyBytes = - DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk); - if (containerDB.get(deletingKeyBytes) != null - || containerDB.get(deletedKeyBytes) != null) { - LOG.debug(String.format( - "Ignoring delete for block %d in container %d." - + " Entry already added.", blk, containerId)); - continue; + try(ReferenceCountedDB containerDB = + BlockUtils.getDB(containerData, conf)) { + for (Long blk : delTX.getLocalIDList()) { + BatchOperation batch = new BatchOperation(); + byte[] blkBytes = Longs.toByteArray(blk); + byte[] blkInfo = containerDB.getStore().get(blkBytes); + if (blkInfo != null) { + byte[] deletingKeyBytes = + DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk); + byte[] deletedKeyBytes = + DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk); + if (containerDB.getStore().get(deletingKeyBytes) != null + || containerDB.getStore().get(deletedKeyBytes) != null) { + LOG.debug(String.format( + "Ignoring delete for block %d in container %d." + + " Entry already added.", blk, containerId)); + continue; + } + // Found the block in container db, + // use an atomic update to change its state to deleting. + batch.put(deletingKeyBytes, blkInfo); + batch.delete(blkBytes); + try { + containerDB.getStore().writeBatch(batch); + newDeletionBlocks++; + LOG.debug("Transited Block {} to DELETING state in container {}", + blk, containerId); + } catch (IOException e) { + // if some blocks failed to delete, we fail this TX, + // without sending this ACK to SCM, SCM will resend the TX + // with a certain number of retries. + throw new IOException( + "Failed to delete blocks for TXID = " + delTX.getTxID(), e); + } + } else { + LOG.debug("Block {} not found or already under deletion in" + + " container {}, skip deleting it.", blk, containerId); } - // Found the block in container db, - // use an atomic update to change its state to deleting. - batch.put(deletingKeyBytes, blkInfo); - batch.delete(blkBytes); - try { - containerDB.writeBatch(batch); - newDeletionBlocks++; - LOG.debug("Transited Block {} to DELETING state in container {}", - blk, containerId); - } catch (IOException e) { - // if some blocks failed to delete, we fail this TX, - // without sending this ACK to SCM, SCM will resend the TX - // with a certain number of retries. - throw new IOException( - "Failed to delete blocks for TXID = " + delTX.getTxID(), e); - } - } else { - LOG.debug("Block {} not found or already under deletion in" - + " container {}, skip deleting it.", blk, containerId); } - } - containerDB - .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX), - Longs.toByteArray(delTX.getTxID())); - containerData - .updateDeleteTransactionId(delTX.getTxID()); - // update pending deletion blocks count in in-memory container status - containerData.incrPendingDeletionBlocks(newDeletionBlocks); + containerDB.getStore() + .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX), + Longs.toByteArray(delTX.getTxID())); + containerData + .updateDeleteTransactionId(delTX.getTxID()); + // update pending deletion blocks count in in-memory container status + containerData.incrPendingDeletionBlocks(newDeletionBlocks); + } } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java index 25d1bdf2918..c15bef0c0cf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java @@ -28,8 +28,11 @@ import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -92,8 +95,8 @@ public final class ContainerCache extends LRUMap { MapIterator iterator = cache.mapIterator(); while (iterator.hasNext()) { iterator.next(); - MetadataStore db = (MetadataStore) iterator.getValue(); - closeDB((String)iterator.getKey(), db); + ReferenceCountedDB db = (ReferenceCountedDB) iterator.getValue(); + db.setEvicted(true); } // reset the cache cache.clear(); @@ -107,11 +110,11 @@ public final class ContainerCache extends LRUMap { */ @Override protected boolean removeLRU(LinkEntry entry) { - MetadataStore db = (MetadataStore) entry.getValue(); + ReferenceCountedDB db = (ReferenceCountedDB) entry.getValue(); String dbFile = (String)entry.getKey(); lock.lock(); try { - closeDB(dbFile, db); + db.setEvicted(false); return true; } catch (Exception e) { LOG.error("Eviction for db:{} failed", dbFile, e); @@ -128,26 +131,30 @@ public final class ContainerCache extends LRUMap { * @param containerDBType - DB type of the container. * @param containerDBPath - DB path of the container. * @param conf - Hadoop Configuration. - * @return MetadataStore. + * @return ReferenceCountedDB. */ - public MetadataStore getDB(long containerID, String containerDBType, + public ReferenceCountedDB getDB(long containerID, String containerDBType, String containerDBPath, Configuration conf) throws IOException { Preconditions.checkState(containerID >= 0, "Container ID cannot be negative."); lock.lock(); try { - MetadataStore db = (MetadataStore) this.get(containerDBPath); + ReferenceCountedDB db = (ReferenceCountedDB) this.get(containerDBPath); if (db == null) { - db = MetadataStoreBuilder.newBuilder() + MetadataStore metadataStore = + MetadataStoreBuilder.newBuilder() .setDbFile(new File(containerDBPath)) .setCreateIfMissing(false) .setConf(conf) .setDBType(containerDBType) .build(); + db = new ReferenceCountedDB(metadataStore, containerDBPath); this.put(containerDBPath, db); } + // increment the reference before returning the object + db.incrementReference(); return db; } catch (Exception e) { LOG.error("Error opening DB. Container:{} ContainerPath:{}", @@ -161,16 +168,70 @@ public final class ContainerCache extends LRUMap { /** * Remove a DB handler from cache. * - * @param containerPath - path of the container db file. + * @param containerDBPath - path of the container db file. */ - public void removeDB(String containerPath) { + public void removeDB(String containerDBPath) { lock.lock(); try { - MetadataStore db = (MetadataStore)this.get(containerPath); - closeDB(containerPath, db); - this.remove(containerPath); + ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath); + if (db != null) { + // marking it as evicted will close the db as well. + db.setEvicted(true); + } + this.remove(containerDBPath); } finally { lock.unlock(); } } + + + /** + * Class to implement reference counting over instances handed by Container + * Cache. + */ + public class ReferenceCountedDB implements Closeable { + private final AtomicInteger referenceCount; + private final AtomicBoolean isEvicted; + private final MetadataStore store; + private final String containerDBPath; + + public ReferenceCountedDB(MetadataStore store, String containerDBPath) { + this.referenceCount = new AtomicInteger(0); + this.isEvicted = new AtomicBoolean(false); + this.store = store; + this.containerDBPath = containerDBPath; + } + + private void incrementReference() { + this.referenceCount.incrementAndGet(); + } + + private void decrementReference() { + this.referenceCount.decrementAndGet(); + cleanup(); + } + + private void setEvicted(boolean checkNoReferences) { + Preconditions.checkState(!checkNoReferences || + (referenceCount.get() == 0), + "checkNoReferences:%b, referencount:%d", + checkNoReferences, referenceCount.get()); + isEvicted.set(true); + cleanup(); + } + + private void cleanup() { + if (referenceCount.get() == 0 && isEvicted.get() && store != null) { + closeDB(containerDBPath, store); + } + } + + public MetadataStore getStore() { + return store; + } + + public void close() { + decrementReference(); + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java index 535af29c190..f1b71b89a93 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java @@ -31,11 +31,12 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat import org.apache.hadoop.utils.MetaStoreIterator; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.apache.hadoop.utils.MetadataStore.KeyValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.NoSuchElementException; @@ -48,12 +49,14 @@ import java.util.NoSuchElementException; * {@link MetadataKeyFilters#getNormalKeyFilter()} */ @InterfaceAudience.Public -public class KeyValueBlockIterator implements BlockIterator { +public class KeyValueBlockIterator implements BlockIterator, + Closeable { private static final Logger LOG = LoggerFactory.getLogger( KeyValueBlockIterator.class); private MetaStoreIterator blockIterator; + private final ReferenceCountedDB db; private static KeyPrefixFilter defaultBlockFilter = MetadataKeyFilters .getNormalKeyFilter(); private KeyPrefixFilter blockFilter; @@ -91,9 +94,9 @@ public class KeyValueBlockIterator implements BlockIterator { containerData; keyValueContainerData.setDbFile(KeyValueContainerLocationUtil .getContainerDBFile(metdataPath, containerId)); - MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, new + db = BlockUtils.getDB(keyValueContainerData, new OzoneConfiguration()); - blockIterator = metadataStore.iterator(); + blockIterator = db.getStore().iterator(); blockFilter = filter; } @@ -145,4 +148,8 @@ public class KeyValueBlockIterator implements BlockIterator { nextBlock = null; blockIterator.seekToLast(); } + + public void close() { + db.close(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 26b0ce1d788..8d5ec72b980 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -54,7 +54,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers .KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.utils.MetadataStore; import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; @@ -74,6 +73,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.UNSUPPORTED_REQUEST; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -349,11 +349,12 @@ public class KeyValueContainer implements Container { void compactDB() throws StorageContainerException { try { - MetadataStore db = BlockUtils.getDB(containerData, config); - db.compactDB(); - LOG.info("Container {} is closed with bcsId {}.", - containerData.getContainerID(), - containerData.getBlockCommitSequenceId()); + try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { + db.getStore().compactDB(); + LOG.info("Container {} is closed with bcsId {}.", + containerData.getContainerID(), + containerData.getBlockCommitSequenceId()); + } } catch (StorageContainerException ex) { throw ex; } catch (IOException ex) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index bdfdf21b4c2..4043914c89d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -30,12 +30,12 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; -import org.apache.hadoop.utils.MetadataStore; import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,41 +236,42 @@ public class KeyValueContainerCheck { onDiskContainerData.setDbFile(dbFile); - MetadataStore db = BlockUtils - .getDB(onDiskContainerData, checkConfig); - - iterateBlockDB(db); + try(ReferenceCountedDB db = + BlockUtils.getDB(onDiskContainerData, checkConfig)) { + iterateBlockDB(db); + } } - private void iterateBlockDB(MetadataStore db) + private void iterateBlockDB(ReferenceCountedDB db) throws IOException { Preconditions.checkState(db != null); // get "normal" keys from the Block DB - KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, - new File(onDiskContainerData.getContainerPath())); + try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, + new File(onDiskContainerData.getContainerPath()))) { - // ensure there is a chunk file for each key in the DB - while (kvIter.hasNext()) { - BlockData block = kvIter.nextBlock(); + // ensure there is a chunk file for each key in the DB + while (kvIter.hasNext()) { + BlockData block = kvIter.nextBlock(); - List chunkInfoList = block.getChunks(); - for (ContainerProtos.ChunkInfo chunk : chunkInfoList) { - File chunkFile; - chunkFile = ChunkUtils.getChunkFile(onDiskContainerData, - ChunkInfo.getFromProtoBuf(chunk)); + List chunkInfoList = block.getChunks(); + for (ContainerProtos.ChunkInfo chunk : chunkInfoList) { + File chunkFile; + chunkFile = ChunkUtils.getChunkFile(onDiskContainerData, + ChunkInfo.getFromProtoBuf(chunk)); - if (!chunkFile.exists()) { - // concurrent mutation in Block DB? lookup the block again. - byte[] bdata = db.get( - Longs.toByteArray(block.getBlockID().getLocalID())); - if (bdata == null) { - LOG.trace("concurrency with delete, ignoring deleted block"); - break; // skip to next block from kvIter - } else { - String errorStr = "Missing chunk file " - + chunkFile.getAbsolutePath(); - throw new IOException(errorStr); + if (!chunkFile.exists()) { + // concurrent mutation in Block DB? lookup the block again. + byte[] bdata = db.getStore().get( + Longs.toByteArray(block.getBlockID().getLocalID())); + if (bdata == null) { + LOG.trace("concurrency with delete, ignoring deleted block"); + break; // skip to next block from kvIter + } else { + String errorStr = "Missing chunk file " + + chunkFile.getAbsolutePath(); + throw new IOException(errorStr); + } } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index 996b5922fe5..fd3c7688f6c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -37,7 +37,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import java.io.IOException; @@ -66,7 +66,7 @@ public final class BlockUtils { * @return MetadataStore handle. * @throws StorageContainerException */ - public static MetadataStore getDB(KeyValueContainerData containerData, + public static ReferenceCountedDB getDB(KeyValueContainerData containerData, Configuration conf) throws StorageContainerException { Preconditions.checkNotNull(containerData); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 7a309555a37..377536a1c91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -39,6 +39,7 @@ import org.apache.hadoop.utils.MetadataStoreBuilder; import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,22 +175,25 @@ public final class KeyValueContainerUtil { } kvContainerData.setDbFile(dbFile); - MetadataStore metadata = BlockUtils.getDB(kvContainerData, config); - long bytesUsed = 0; - List> liveKeys = metadata - .getRangeKVs(null, Integer.MAX_VALUE, - MetadataKeyFilters.getNormalKeyFilter()); - bytesUsed = liveKeys.parallelStream().mapToLong(e-> { - BlockData blockData; - try { - blockData = BlockUtils.getBlockData(e.getValue()); - return blockData.getSize(); - } catch (IOException ex) { - return 0L; - } - }).sum(); - kvContainerData.setBytesUsed(bytesUsed); - kvContainerData.setKeyCount(liveKeys.size()); + try(ReferenceCountedDB metadata = + BlockUtils.getDB(kvContainerData, config)) { + long bytesUsed = 0; + List> liveKeys = metadata.getStore() + .getRangeKVs(null, Integer.MAX_VALUE, + MetadataKeyFilters.getNormalKeyFilter()); + + bytesUsed = liveKeys.parallelStream().mapToLong(e-> { + BlockData blockData; + try { + blockData = BlockUtils.getBlockData(e.getValue()); + return blockData.getSize(); + } catch (IOException ex) { + return 0L; + } + }).sum(); + kvContainerData.setBytesUsed(bytesUsed); + kvContainerData.setKeyCount(liveKeys.size()); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 3033dd9017d..f62a013f4cc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,47 +84,47 @@ public class BlockManagerImpl implements BlockManager { "cannot be negative"); // We are not locking the key manager since LevelDb serializes all actions // against a single DB. We rely on DB level locking to avoid conflicts. - MetadataStore db = BlockUtils.getDB((KeyValueContainerData) container - .getContainerData(), config); + try(ReferenceCountedDB db = BlockUtils. + getDB((KeyValueContainerData) container.getContainerData(), config)) { + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); - // This is a post condition that acts as a hint to the user. - // Should never fail. - Preconditions.checkNotNull(db, "DB cannot be null here"); + long bcsId = data.getBlockCommitSequenceId(); + long containerBCSId = ((KeyValueContainerData) container. + getContainerData()).getBlockCommitSequenceId(); - long bcsId = data.getBlockCommitSequenceId(); - long containerBCSId = ((KeyValueContainerData) container.getContainerData()) - .getBlockCommitSequenceId(); - - // default blockCommitSequenceId for any block is 0. It the putBlock - // request is not coming via Ratis(for test scenarios), it will be 0. - // In such cases, we should overwrite the block as well - if (bcsId != 0) { - if (bcsId <= containerBCSId) { - // Since the blockCommitSequenceId stored in the db is greater than - // equal to blockCommitSequenceId to be updated, it means the putBlock - // transaction is reapplied in the ContainerStateMachine on restart. - // It also implies that the given block must already exist in the db. - // just log and return - LOG.warn("blockCommitSequenceId " + containerBCSId - + " in the Container Db is greater than" + " the supplied value " - + bcsId + " .Ignoring it"); - return data.getSize(); + // default blockCommitSequenceId for any block is 0. It the putBlock + // request is not coming via Ratis(for test scenarios), it will be 0. + // In such cases, we should overwrite the block as well + if (bcsId != 0) { + if (bcsId <= containerBCSId) { + // Since the blockCommitSequenceId stored in the db is greater than + // equal to blockCommitSequenceId to be updated, it means the putBlock + // transaction is reapplied in the ContainerStateMachine on restart. + // It also implies that the given block must already exist in the db. + // just log and return + LOG.warn("blockCommitSequenceId " + containerBCSId + + " in the Container Db is greater than" + " the supplied value " + + bcsId + " .Ignoring it"); + return data.getSize(); + } } + // update the blockData as well as BlockCommitSequenceId here + BatchOperation batch = new BatchOperation(); + batch.put(Longs.toByteArray(data.getLocalID()), + data.getProtoBufMessage().toByteArray()); + batch.put(blockCommitSequenceIdKey, + Longs.toByteArray(bcsId)); + db.getStore().writeBatch(batch); + container.updateBlockCommitSequenceId(bcsId); + // Increment keycount here + container.getContainerData().incrKeyCount(); + LOG.debug( + "Block " + data.getBlockID() + " successfully committed with bcsId " + + bcsId + " chunk size " + data.getChunks().size()); + return data.getSize(); } - // update the blockData as well as BlockCommitSequenceId here - BatchOperation batch = new BatchOperation(); - batch.put(Longs.toByteArray(data.getLocalID()), - data.getProtoBufMessage().toByteArray()); - batch.put(blockCommitSequenceIdKey, - Longs.toByteArray(bcsId)); - db.writeBatch(batch); - container.updateBlockCommitSequenceId(bcsId); - // Increment keycount here - container.getContainerData().incrKeyCount(); - LOG.debug( - "Block " + data.getBlockID() + " successfully committed with bcsId " - + bcsId + " chunk size " + data.getChunks().size()); - return data.getSize(); } /** @@ -146,32 +146,33 @@ public class BlockManagerImpl implements BlockManager { KeyValueContainerData containerData = (KeyValueContainerData) container .getContainerData(); - MetadataStore db = BlockUtils.getDB(containerData, config); - // This is a post condition that acts as a hint to the user. - // Should never fail. - Preconditions.checkNotNull(db, "DB cannot be null here"); + try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); - long containerBCSId = containerData.getBlockCommitSequenceId(); - if (containerBCSId < bcsId) { - throw new StorageContainerException( - "Unable to find the block with bcsID " + bcsId + " .Container " - + container.getContainerData().getContainerID() + " bcsId is " - + containerBCSId + ".", UNKNOWN_BCSID); + long containerBCSId = containerData.getBlockCommitSequenceId(); + if (containerBCSId < bcsId) { + throw new StorageContainerException( + "Unable to find the block with bcsID " + bcsId + " .Container " + + container.getContainerData().getContainerID() + " bcsId is " + + containerBCSId + ".", UNKNOWN_BCSID); + } + byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID())); + if (kData == null) { + throw new StorageContainerException("Unable to find the block." + + blockID, NO_SUCH_BLOCK); + } + ContainerProtos.BlockData blockData = + ContainerProtos.BlockData.parseFrom(kData); + long id = blockData.getBlockID().getBlockCommitSequenceId(); + if (id < bcsId) { + throw new StorageContainerException( + "bcsId " + bcsId + " mismatches with existing block Id " + + id + " for block " + blockID + ".", BCSID_MISMATCH); + } + return BlockData.getFromProtoBuf(blockData); } - byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); - if (kData == null) { - throw new StorageContainerException("Unable to find the block." + blockID, - NO_SUCH_BLOCK); - } - ContainerProtos.BlockData blockData = - ContainerProtos.BlockData.parseFrom(kData); - long id = blockData.getBlockID().getBlockCommitSequenceId(); - if (id < bcsId) { - throw new StorageContainerException( - "bcsId " + bcsId + " mismatches with existing block Id " - + id + " for block " + blockID + ".", BCSID_MISMATCH); - } - return BlockData.getFromProtoBuf(blockData); } /** @@ -187,18 +188,19 @@ public class BlockManagerImpl implements BlockManager { throws IOException { KeyValueContainerData containerData = (KeyValueContainerData) container .getContainerData(); - MetadataStore db = BlockUtils.getDB(containerData, config); - // This is a post condition that acts as a hint to the user. - // Should never fail. - Preconditions.checkNotNull(db, "DB cannot be null here"); - byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); - if (kData == null) { - throw new StorageContainerException("Unable to find the block.", - NO_SUCH_BLOCK); + try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID())); + if (kData == null) { + throw new StorageContainerException("Unable to find the block.", + NO_SUCH_BLOCK); + } + ContainerProtos.BlockData blockData = + ContainerProtos.BlockData.parseFrom(kData); + return blockData.getSize(); } - ContainerProtos.BlockData blockData = - ContainerProtos.BlockData.parseFrom(kData); - return blockData.getSize(); } /** @@ -218,24 +220,24 @@ public class BlockManagerImpl implements BlockManager { KeyValueContainerData cData = (KeyValueContainerData) container .getContainerData(); - MetadataStore db = BlockUtils.getDB(cData, config); - // This is a post condition that acts as a hint to the user. - // Should never fail. - Preconditions.checkNotNull(db, "DB cannot be null here"); - // Note : There is a race condition here, since get and delete - // are not atomic. Leaving it here since the impact is refusing - // to delete a Block which might have just gotten inserted after - // the get check. - byte[] kKey = Longs.toByteArray(blockID.getLocalID()); - byte[] kData = db.get(kKey); - if (kData == null) { - throw new StorageContainerException("Unable to find the block.", - NO_SUCH_BLOCK); + try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) { + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + // Note : There is a race condition here, since get and delete + // are not atomic. Leaving it here since the impact is refusing + // to delete a Block which might have just gotten inserted after + // the get check. + byte[] kKey = Longs.toByteArray(blockID.getLocalID()); + try { + db.getStore().delete(kKey); + } catch (IOException e) { + throw new StorageContainerException("Unable to find the block.", + NO_SUCH_BLOCK); + } + // Decrement blockcount here + container.getContainerData().decrKeyCount(); } - db.delete(kKey); - - // Decrement blockcount here - container.getContainerData().decrKeyCount(); } /** @@ -258,18 +260,19 @@ public class BlockManagerImpl implements BlockManager { List result = null; KeyValueContainerData cData = (KeyValueContainerData) container .getContainerData(); - MetadataStore db = BlockUtils.getDB(cData, config); - result = new ArrayList<>(); - byte[] startKeyInBytes = Longs.toByteArray(startLocalID); - List> range = - db.getSequentialRangeKVs(startKeyInBytes, count, - MetadataKeyFilters.getNormalKeyFilter()); - for (Map.Entry entry : range) { - BlockData value = BlockUtils.getBlockData(entry.getValue()); - BlockData data = new BlockData(value.getBlockID()); - result.add(data); + try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) { + result = new ArrayList<>(); + byte[] startKeyInBytes = Longs.toByteArray(startLocalID); + List> range = + db.getStore().getSequentialRangeKVs(startKeyInBytes, count, + MetadataKeyFilters.getNormalKeyFilter()); + for (Map.Entry entry : range) { + BlockData value = BlockUtils.getBlockData(entry.getValue()); + BlockData data = new BlockData(value.getBlockID()); + result.add(data); + } + return result; } - return result; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java index 61a303fcdd2..c03bea791fc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java @@ -43,7 +43,7 @@ import org.apache.hadoop.utils.BackgroundTaskQueue; import org.apache.hadoop.utils.BackgroundTaskResult; import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,69 +185,71 @@ public class BlockDeletingService extends BackgroundService{ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); long startTime = Time.monotonicNow(); // Scan container's db and get list of under deletion blocks - MetadataStore meta = BlockUtils.getDB( - (KeyValueContainerData) containerData, conf); - // # of blocks to delete is throttled - KeyPrefixFilter filter = - new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX); - List> toDeleteBlocks = - meta.getSequentialRangeKVs(null, blockLimitPerTask, filter); - if (toDeleteBlocks.isEmpty()) { - LOG.debug("No under deletion block found in container : {}", - containerData.getContainerID()); - } + try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) { + // # of blocks to delete is throttled + KeyPrefixFilter filter = + new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX); + List> toDeleteBlocks = + meta.getStore().getSequentialRangeKVs(null, blockLimitPerTask, + filter); + if (toDeleteBlocks.isEmpty()) { + LOG.debug("No under deletion block found in container : {}", + containerData.getContainerID()); + } - List succeedBlocks = new LinkedList<>(); - LOG.debug("Container : {}, To-Delete blocks : {}", - containerData.getContainerID(), toDeleteBlocks.size()); - File dataDir = new File(containerData.getChunksPath()); - if (!dataDir.exists() || !dataDir.isDirectory()) { - LOG.error("Invalid container data dir {} : " - + "does not exist or not a directory", dataDir.getAbsolutePath()); + List succeedBlocks = new LinkedList<>(); + LOG.debug("Container : {}, To-Delete blocks : {}", + containerData.getContainerID(), toDeleteBlocks.size()); + File dataDir = new File(containerData.getChunksPath()); + if (!dataDir.exists() || !dataDir.isDirectory()) { + LOG.error("Invalid container data dir {} : " + + "does not exist or not a directory", dataDir.getAbsolutePath()); + return crr; + } + + toDeleteBlocks.forEach(entry -> { + String blockName = DFSUtil.bytes2String(entry.getKey()); + LOG.debug("Deleting block {}", blockName); + try { + ContainerProtos.BlockData data = + ContainerProtos.BlockData.parseFrom(entry.getValue()); + for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { + File chunkFile = dataDir.toPath() + .resolve(chunkInfo.getChunkName()).toFile(); + if (FileUtils.deleteQuietly(chunkFile)) { + LOG.debug("block {} chunk {} deleted", blockName, + chunkFile.getAbsolutePath()); + } + } + succeedBlocks.add(blockName); + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse block info for block {}", blockName, e); + } + }); + + // Once files are deleted... replace deleting entries with deleted + // entries + BatchOperation batch = new BatchOperation(); + succeedBlocks.forEach(entry -> { + String blockId = + entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length()); + String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId; + batch.put(DFSUtil.string2Bytes(deletedEntry), + DFSUtil.string2Bytes(blockId)); + batch.delete(DFSUtil.string2Bytes(entry)); + }); + meta.getStore().writeBatch(batch); + // update count of pending deletion blocks in in-memory container status + containerData.decrPendingDeletionBlocks(succeedBlocks.size()); + + if (!succeedBlocks.isEmpty()) { + LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", + containerData.getContainerID(), succeedBlocks.size(), + Time.monotonicNow() - startTime); + } + crr.addAll(succeedBlocks); return crr; } - - toDeleteBlocks.forEach(entry -> { - String blockName = DFSUtil.bytes2String(entry.getKey()); - LOG.debug("Deleting block {}", blockName); - try { - ContainerProtos.BlockData data = - ContainerProtos.BlockData.parseFrom(entry.getValue()); - for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { - File chunkFile = dataDir.toPath() - .resolve(chunkInfo.getChunkName()).toFile(); - if (FileUtils.deleteQuietly(chunkFile)) { - LOG.debug("block {} chunk {} deleted", blockName, - chunkFile.getAbsolutePath()); - } - } - succeedBlocks.add(blockName); - } catch (InvalidProtocolBufferException e) { - LOG.error("Failed to parse block info for block {}", blockName, e); - } - }); - - // Once files are deleted... replace deleting entries with deleted entries - BatchOperation batch = new BatchOperation(); - succeedBlocks.forEach(entry -> { - String blockId = - entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length()); - String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId; - batch.put(DFSUtil.string2Bytes(deletedEntry), - DFSUtil.string2Bytes(blockId)); - batch.delete(DFSUtil.string2Bytes(entry)); - }); - meta.writeBatch(batch); - // update count of pending deletion blocks in in-memory container status - containerData.decrPendingDeletionBlocks(succeedBlocks.size()); - - if (!succeedBlocks.isEmpty()) { - LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", - containerData.getContainerID(), succeedBlocks.size(), - Time.monotonicNow() - startTime); - } - crr.addAll(succeedBlocks); - return crr; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 0192fd5dd1b..d5455aa3e7e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -38,7 +38,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,28 +180,31 @@ public class ContainerReader implements Runnable { KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); KeyValueContainer kvContainer = new KeyValueContainer( kvContainerData, config); - MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config); - MetadataKeyFilters.KeyPrefixFilter filter = - new MetadataKeyFilters.KeyPrefixFilter() - .addFilter(OzoneConsts.DELETING_KEY_PREFIX); - int numPendingDeletionBlocks = - containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter) - .size(); - kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks); - byte[] delTxnId = containerDB.get( - DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX)); - if (delTxnId != null) { - kvContainerData - .updateDeleteTransactionId(Longs.fromByteArray(delTxnId)); + try(ReferenceCountedDB containerDB = BlockUtils.getDB(kvContainerData, + config)) { + MetadataKeyFilters.KeyPrefixFilter filter = + new MetadataKeyFilters.KeyPrefixFilter() + .addFilter(OzoneConsts.DELETING_KEY_PREFIX); + int numPendingDeletionBlocks = + containerDB.getStore().getSequentialRangeKVs(null, + Integer.MAX_VALUE, filter) + .size(); + kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks); + byte[] delTxnId = containerDB.getStore().get( + DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX)); + if (delTxnId != null) { + kvContainerData + .updateDeleteTransactionId(Longs.fromByteArray(delTxnId)); + } + // sets the BlockCommitSequenceId. + byte[] bcsId = containerDB.getStore().get(DFSUtil.string2Bytes( + OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX)); + if (bcsId != null) { + kvContainerData + .updateBlockCommitSequenceId(Longs.fromByteArray(bcsId)); + } + containerSet.addContainer(kvContainer); } - // sets the BlockCommitSequenceId. - byte[] bcsId = containerDB.get( - DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX)); - if (bcsId != null) { - kvContainerData - .updateBlockCommitSequenceId(Longs.fromByteArray(bcsId)); - } - containerSet.addContainer(kvContainer); } else { throw new StorageContainerException("Container File is corrupted. " + "ContainerType is KeyValueContainer but cast to " + diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java index 15d7b342d4a..687e64e16b1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java @@ -34,7 +34,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -109,30 +109,31 @@ public class TestKeyValueBlockIterator { createContainerWithBlocks(containerID, normalBlocks, deletedBlocks); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerID, new File(containerPath)); + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerID, new File(containerPath))) { - int counter = 0; - while(keyValueBlockIterator.hasNext()) { - BlockData blockData = keyValueBlockIterator.nextBlock(); - assertEquals(blockData.getLocalID(), counter++); - } + int counter = 0; + while (keyValueBlockIterator.hasNext()) { + BlockData blockData = keyValueBlockIterator.nextBlock(); + assertEquals(blockData.getLocalID(), counter++); + } - assertFalse(keyValueBlockIterator.hasNext()); + assertFalse(keyValueBlockIterator.hasNext()); - keyValueBlockIterator.seekToFirst(); - counter = 0; - while(keyValueBlockIterator.hasNext()) { - BlockData blockData = keyValueBlockIterator.nextBlock(); - assertEquals(blockData.getLocalID(), counter++); - } - assertFalse(keyValueBlockIterator.hasNext()); + keyValueBlockIterator.seekToFirst(); + counter = 0; + while (keyValueBlockIterator.hasNext()) { + BlockData blockData = keyValueBlockIterator.nextBlock(); + assertEquals(blockData.getLocalID(), counter++); + } + assertFalse(keyValueBlockIterator.hasNext()); - try { - keyValueBlockIterator.nextBlock(); - } catch (NoSuchElementException ex) { - GenericTestUtils.assertExceptionContains("Block Iterator reached end " + - "for ContainerID " + containerID, ex); + try { + keyValueBlockIterator.nextBlock(); + } catch (NoSuchElementException ex) { + GenericTestUtils.assertExceptionContains("Block Iterator reached end " + + "for ContainerID " + containerID, ex); + } } } @@ -142,17 +143,18 @@ public class TestKeyValueBlockIterator { createContainerWithBlocks(containerID, 2, 0); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerID, new File(containerPath)); - long blockID = 0L; - assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); - assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerID, new File(containerPath))) { + long blockID = 0L; + assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); + assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); - try { - keyValueBlockIterator.nextBlock(); - } catch (NoSuchElementException ex) { - GenericTestUtils.assertExceptionContains("Block Iterator reached end " + - "for ContainerID " + containerID, ex); + try { + keyValueBlockIterator.nextBlock(); + } catch (NoSuchElementException ex) { + GenericTestUtils.assertExceptionContains("Block Iterator reached end " + + "for ContainerID " + containerID, ex); + } } } @@ -162,42 +164,41 @@ public class TestKeyValueBlockIterator { createContainerWithBlocks(containerID, 2, 0); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerID, new File(containerPath)); - long blockID = 0L; + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerID, new File(containerPath))) { + long blockID = 0L; - // Even calling multiple times hasNext() should not move entry forward. - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); + // Even calling multiple times hasNext() should not move entry forward. + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertTrue(keyValueBlockIterator.hasNext()); - assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertTrue(keyValueBlockIterator.hasNext()); + assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); - keyValueBlockIterator.seekToLast(); - assertTrue(keyValueBlockIterator.hasNext()); - assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); + keyValueBlockIterator.seekToLast(); + assertTrue(keyValueBlockIterator.hasNext()); + assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); - keyValueBlockIterator.seekToFirst(); - blockID = 0L; - assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); - assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); + keyValueBlockIterator.seekToFirst(); + blockID = 0L; + assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID()); + assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID()); - try { - keyValueBlockIterator.nextBlock(); - } catch (NoSuchElementException ex) { - GenericTestUtils.assertExceptionContains("Block Iterator reached end " + - "for ContainerID " + containerID, ex); + try { + keyValueBlockIterator.nextBlock(); + } catch (NoSuchElementException ex) { + GenericTestUtils.assertExceptionContains("Block Iterator reached end " + + "for ContainerID " + containerID, ex); + } } - - } @Test @@ -208,14 +209,15 @@ public class TestKeyValueBlockIterator { createContainerWithBlocks(containerId, normalBlocks, deletedBlocks); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( containerId, new File(containerPath), MetadataKeyFilters - .getDeletingKeyFilter()); + .getDeletingKeyFilter())) { - int counter = 5; - while(keyValueBlockIterator.hasNext()) { - BlockData blockData = keyValueBlockIterator.nextBlock(); - assertEquals(blockData.getLocalID(), counter++); + int counter = 5; + while (keyValueBlockIterator.hasNext()) { + BlockData blockData = keyValueBlockIterator.nextBlock(); + assertEquals(blockData.getLocalID(), counter++); + } } } @@ -226,11 +228,12 @@ public class TestKeyValueBlockIterator { createContainerWithBlocks(containerId, 0, 5); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerId, new File(containerPath)); - //As all blocks are deleted blocks, blocks does not match with normal key - // filter. - assertFalse(keyValueBlockIterator.hasNext()); + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerId, new File(containerPath))) { + //As all blocks are deleted blocks, blocks does not match with normal key + // filter. + assertFalse(keyValueBlockIterator.hasNext()); + } } /** @@ -251,27 +254,30 @@ public class TestKeyValueBlockIterator { container = new KeyValueContainer(containerData, conf); container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID .randomUUID().toString()); - MetadataStore metadataStore = BlockUtils.getDB(containerData, conf); + try(ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData, + conf)) { - List chunkList = new ArrayList<>(); - ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024); - chunkList.add(info.getProtoBufMessage()); + List chunkList = new ArrayList<>(); + ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024); + chunkList.add(info.getProtoBufMessage()); - for (int i=0; i chunkList = new ArrayList<>(); - ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID - .getLocalID(), 0), 0, 1024); - chunkList.add(info.getProtoBufMessage()); - blockData.setChunks(chunkList); - metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData - .getProtoBufMessage().toByteArray()); + try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer + .getContainerData(), conf)) { + for (int i = 0; i < count; i++) { + // Creating BlockData + BlockID blockID = new BlockID(containerId, i); + BlockData blockData = new BlockData(blockID); + blockData.addMetadata("VOLUME", "ozone"); + blockData.addMetadata("OWNER", "hdfs"); + List chunkList = new ArrayList<>(); + ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, 1024); + chunkList.add(info.getProtoBufMessage()); + blockData.setChunks(chunkList); + metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()), + blockData + .getProtoBufMessage().toByteArray()); + } } - } @SuppressWarnings("RedundantCast") @@ -191,9 +192,12 @@ public class TestKeyValueContainer { int numberOfKeysToWrite = 12; //write one few keys to check the key count after import - MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, conf); - for (int i = 0; i < numberOfKeysToWrite; i++) { - metadataStore.put(("test" + i).getBytes(UTF_8), "test".getBytes(UTF_8)); + try(ReferenceCountedDB metadataStore = + BlockUtils.getDB(keyValueContainerData, conf)) { + for (int i = 0; i < numberOfKeysToWrite; i++) { + metadataStore.getStore().put(("test" + i).getBytes(UTF_8), + "test".getBytes(UTF_8)); + } } BlockUtils.removeDB(keyValueContainerData, conf); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java index 0bc1bbc387b..cae275af525 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java @@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingP import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -149,48 +149,50 @@ import static org.junit.Assert.assertTrue; container = new KeyValueContainer(containerData, conf); container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID.randomUUID().toString()); - MetadataStore metadataStore = BlockUtils.getDB(containerData, conf); - chunkManager = new ChunkManagerImpl(true); + try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData, + conf)) { + chunkManager = new ChunkManagerImpl(true); - assertTrue(containerData.getChunksPath() != null); - File chunksPath = new File(containerData.getChunksPath()); - assertTrue(chunksPath.exists()); - // Initially chunks folder should be empty. - assertTrue(chunksPath.listFiles().length == 0); + assertTrue(containerData.getChunksPath() != null); + File chunksPath = new File(containerData.getChunksPath()); + assertTrue(chunksPath.exists()); + // Initially chunks folder should be empty. + assertTrue(chunksPath.listFiles().length == 0); - List chunkList = new ArrayList<>(); - for (int i = 0; i < (totalBlks); i++) { - BlockID blockID = new BlockID(containerId, i); - BlockData blockData = new BlockData(blockID); + List chunkList = new ArrayList<>(); + for (int i = 0; i < (totalBlks); i++) { + BlockID blockID = new BlockID(containerId, i); + BlockData blockData = new BlockData(blockID); - chunkList.clear(); - for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) { - String chunkName = strBlock + i + strChunk + chunkCount; - long offset = chunkCount * chunkLen; - ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen); - chunkList.add(info.getProtoBufMessage()); - chunkManager - .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), - new DispatcherContext.Builder() - .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) - .build()); - chunkManager - .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), - new DispatcherContext.Builder() - .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) - .build()); - } - blockData.setChunks(chunkList); + chunkList.clear(); + for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) { + String chunkName = strBlock + i + strChunk + chunkCount; + long offset = chunkCount * chunkLen; + ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen); + chunkList.add(info.getProtoBufMessage()); + chunkManager + .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), + new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) + .build()); + chunkManager + .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), + new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) + .build()); + } + blockData.setChunks(chunkList); - if (i >= normalBlocks) { - // deleted key - metadataStore.put(DFSUtil.string2Bytes( - OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()), - blockData.getProtoBufMessage().toByteArray()); - } else { - // normal key - metadataStore.put(Longs.toByteArray(blockID.getLocalID()), - blockData.getProtoBufMessage().toByteArray()); + if (i >= normalBlocks) { + // deleted key + metadataStore.getStore().put(DFSUtil.string2Bytes( + OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()), + blockData.getProtoBufMessage().toByteArray()); + } else { + // normal key + metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()), + blockData.getProtoBufMessage().toByteArray()); + } } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java index 5b551199a42..da81e6de454 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java @@ -40,7 +40,7 @@ import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import java.io.IOException; import java.io.OutputStream; @@ -119,16 +119,17 @@ public class TestStorageContainerManagerHelper { public List getPendingDeletionBlocks(Long containerID) throws IOException { List pendingDeletionBlocks = Lists.newArrayList(); - MetadataStore meta = getContainerMetadata(containerID); + ReferenceCountedDB meta = getContainerMetadata(containerID); KeyPrefixFilter filter = new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX); - List> kvs = meta + List> kvs = meta.getStore() .getRangeKVs(null, Integer.MAX_VALUE, filter); kvs.forEach(entry -> { String key = DFSUtil.bytes2String(entry.getKey()); pendingDeletionBlocks .add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, "")); }); + meta.close(); return pendingDeletionBlocks; } @@ -143,17 +144,18 @@ public class TestStorageContainerManagerHelper { public List getAllBlocks(Long containeID) throws IOException { List allBlocks = Lists.newArrayList(); - MetadataStore meta = getContainerMetadata(containeID); + ReferenceCountedDB meta = getContainerMetadata(containeID); List> kvs = - meta.getRangeKVs(null, Integer.MAX_VALUE, + meta.getStore().getRangeKVs(null, Integer.MAX_VALUE, MetadataKeyFilters.getNormalKeyFilter()); kvs.forEach(entry -> { allBlocks.add(Longs.fromByteArray(entry.getKey())); }); + meta.close(); return allBlocks; } - private MetadataStore getContainerMetadata(Long containerID) + private ReferenceCountedDB getContainerMetadata(Long containerID) throws IOException { ContainerWithPipeline containerWithPipeline = cluster .getStorageContainerManager().getClientProtocolServer() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 0d32f832506..17e199511e7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -953,18 +953,19 @@ public abstract class TestOzoneRpcClientAbstract { .getContainerData()); String containerPath = new File(containerData.getMetadataPath()) .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerID, new File(containerPath)); - while (keyValueBlockIterator.hasNext()) { - BlockData blockData = keyValueBlockIterator.nextBlock(); - if (blockData.getBlockID().getLocalID() == localID) { - long length = 0; - List chunks = blockData.getChunks(); - for (ContainerProtos.ChunkInfo chunk : chunks) { - length += chunk.getLen(); + try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerID, new File(containerPath))) { + while (keyValueBlockIterator.hasNext()) { + BlockData blockData = keyValueBlockIterator.nextBlock(); + if (blockData.getBlockID().getLocalID() == localID) { + long length = 0; + List chunks = blockData.getChunks(); + for (ContainerProtos.ChunkInfo chunk : chunks) { + length += chunk.getLen(); + } + Assert.assertEquals(length, keyValue.getBytes().length); + break; } - Assert.assertEquals(length, keyValue.getBytes().length); - break; } } } @@ -1115,31 +1116,32 @@ public abstract class TestOzoneRpcClientAbstract { (KeyValueContainerData) container.getContainerData(); String containerPath = new File(containerData.getMetadataPath()).getParent(); - KeyValueBlockIterator keyValueBlockIterator = - new KeyValueBlockIterator(containerID, new File(containerPath)); + try (KeyValueBlockIterator keyValueBlockIterator = + new KeyValueBlockIterator(containerID, new File(containerPath))) { - // Find the block corresponding to the key we put. We use the localID of - // the BlockData to identify out key. - BlockData blockData = null; - while (keyValueBlockIterator.hasNext()) { - blockData = keyValueBlockIterator.nextBlock(); - if (blockData.getBlockID().getLocalID() == localID) { - break; + // Find the block corresponding to the key we put. We use the localID of + // the BlockData to identify out key. + BlockData blockData = null; + while (keyValueBlockIterator.hasNext()) { + blockData = keyValueBlockIterator.nextBlock(); + if (blockData.getBlockID().getLocalID() == localID) { + break; + } } + Assert.assertNotNull("Block not found", blockData); + + // Get the location of the chunk file + String chunkName = blockData.getChunks().get(0).getChunkName(); + String containreBaseDir = + container.getContainerData().getVolume().getHddsRootDir().getPath(); + File chunksLocationPath = KeyValueContainerLocationUtil + .getChunksLocationPath(containreBaseDir, scmId, containerID); + File chunkFile = new File(chunksLocationPath, chunkName); + + // Corrupt the contents of the chunk file + String newData = new String("corrupted data"); + FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes()); } - Assert.assertNotNull("Block not found", blockData); - - // Get the location of the chunk file - String chunkName = blockData.getChunks().get(0).getChunkName(); - String containreBaseDir = - container.getContainerData().getVolume().getHddsRootDir().getPath(); - File chunksLocationPath = KeyValueContainerLocationUtil - .getChunksLocationPath(containreBaseDir, scmId, containerID); - File chunkFile = new File(chunksLocationPath, chunkName); - - // Corrupt the contents of the chunk file - String newData = new String("corrupted data"); - FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes()); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 27fe4ffedd0..9993f90ca27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -45,7 +45,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; @@ -119,35 +119,36 @@ public class TestBlockDeletingService { containerSet.addContainer(container); data = (KeyValueContainerData) containerSet.getContainer( containerID).getContainerData(); - MetadataStore metadata = BlockUtils.getDB(data, conf); - for (int j = 0; j chunks = Lists.newArrayList(); - for (int k = 0; k chunks = Lists.newArrayList(); + for (int k = 0; k < numOfChunksPerBlock; k++) { + // offset doesn't matter here + String chunkName = blockID.getLocalID() + "_chunk_" + k; + File chunk = new File(data.getChunksPath(), chunkName); + FileUtils.writeStringToFile(chunk, "a chunk", + Charset.defaultCharset()); + LOG.info("Creating file {}", chunk.getAbsolutePath()); + // make sure file exists + Assert.assertTrue(chunk.isFile() && chunk.exists()); + ContainerProtos.ChunkInfo info = + ContainerProtos.ChunkInfo.newBuilder() + .setChunkName(chunk.getAbsolutePath()) + .setLen(0) + .setOffset(0) + .setChecksumData(Checksum.getNoChecksumDataProto()) + .build(); + chunks.add(info); + } + kd.setChunks(chunks); + metadata.getStore().put(DFSUtil.string2Bytes(deleteStateName), + kd.getProtoBufMessage().toByteArray()); } - kd.setChunks(chunks); - metadata.put(DFSUtil.string2Bytes(deleteStateName), - kd.getProtoBufMessage().toByteArray()); } } } @@ -166,17 +167,19 @@ public class TestBlockDeletingService { * Get under deletion blocks count from DB, * note this info is parsed from container.db. */ - private int getUnderDeletionBlocksCount(MetadataStore meta) + private int getUnderDeletionBlocksCount(ReferenceCountedDB meta) throws IOException { List> underDeletionBlocks = - meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter() + meta.getStore().getRangeKVs(null, 100, + new MetadataKeyFilters.KeyPrefixFilter() .addFilter(OzoneConsts.DELETING_KEY_PREFIX)); return underDeletionBlocks.size(); } - private int getDeletedBlocksCount(MetadataStore db) throws IOException { + private int getDeletedBlocksCount(ReferenceCountedDB db) throws IOException { List> underDeletionBlocks = - db.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter() + db.getStore().getRangeKVs(null, 100, + new MetadataKeyFilters.KeyPrefixFilter() .addFilter(OzoneConsts.DELETED_KEY_PREFIX)); return underDeletionBlocks.size(); } @@ -202,37 +205,38 @@ public class TestBlockDeletingService { containerSet.listContainer(0L, 1, containerData); Assert.assertEquals(1, containerData.size()); - MetadataStore meta = BlockUtils.getDB( - (KeyValueContainerData) containerData.get(0), conf); - Map containerMap = containerSet.getContainerMapCopy(); - // NOTE: this test assumes that all the container is KetValueContainer and - // have DeleteTransactionId in KetValueContainerData. If other - // types is going to be added, this test should be checked. - long transactionId = ((KeyValueContainerData)containerMap - .get(containerData.get(0).getContainerID()).getContainerData()) - .getDeleteTransactionId(); + try(ReferenceCountedDB meta = BlockUtils.getDB( + (KeyValueContainerData) containerData.get(0), conf)) { + Map containerMap = containerSet.getContainerMapCopy(); + // NOTE: this test assumes that all the container is KetValueContainer and + // have DeleteTransactionId in KetValueContainerData. If other + // types is going to be added, this test should be checked. + long transactionId = ((KeyValueContainerData) containerMap + .get(containerData.get(0).getContainerID()).getContainerData()) + .getDeleteTransactionId(); - // Number of deleted blocks in container should be equal to 0 before - // block delete - Assert.assertEquals(0, transactionId); + // Number of deleted blocks in container should be equal to 0 before + // block delete + Assert.assertEquals(0, transactionId); - // Ensure there are 3 blocks under deletion and 0 deleted blocks - Assert.assertEquals(3, getUnderDeletionBlocksCount(meta)); - Assert.assertEquals(0, getDeletedBlocksCount(meta)); + // Ensure there are 3 blocks under deletion and 0 deleted blocks + Assert.assertEquals(3, getUnderDeletionBlocksCount(meta)); + Assert.assertEquals(0, getDeletedBlocksCount(meta)); - // An interval will delete 1 * 2 blocks - deleteAndWait(svc, 1); - Assert.assertEquals(1, getUnderDeletionBlocksCount(meta)); - Assert.assertEquals(2, getDeletedBlocksCount(meta)); + // An interval will delete 1 * 2 blocks + deleteAndWait(svc, 1); + Assert.assertEquals(1, getUnderDeletionBlocksCount(meta)); + Assert.assertEquals(2, getDeletedBlocksCount(meta)); - deleteAndWait(svc, 2); - Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); - Assert.assertEquals(3, getDeletedBlocksCount(meta)); + deleteAndWait(svc, 2); + Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); + Assert.assertEquals(3, getDeletedBlocksCount(meta)); - deleteAndWait(svc, 3); - Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); - Assert.assertEquals(3, getDeletedBlocksCount(meta)); + deleteAndWait(svc, 3); + Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); + Assert.assertEquals(3, getDeletedBlocksCount(meta)); + } svc.shutdown(); } @@ -311,25 +315,26 @@ public class TestBlockDeletingService { // get container meta data List containerData = Lists.newArrayList(); containerSet.listContainer(0L, 1, containerData); - MetadataStore meta = BlockUtils.getDB( - (KeyValueContainerData) containerData.get(0), conf); + try(ReferenceCountedDB meta = BlockUtils.getDB( + (KeyValueContainerData) containerData.get(0), conf)) { - LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG); - GenericTestUtils.waitFor(() -> { - try { - if (getUnderDeletionBlocksCount(meta) == 0) { - return true; + LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG); + GenericTestUtils.waitFor(() -> { + try { + if (getUnderDeletionBlocksCount(meta) == 0) { + return true; + } + } catch (IOException ignored) { } - } catch (IOException ignored) { - } - return false; - }, 1000, 100000); - newLog.stopCapturing(); + return false; + }, 1000, 100000); + newLog.stopCapturing(); - // The block deleting successfully and shouldn't catch timed - // out warning log. - Assert.assertTrue(!newLog.getOutput().contains( - "Background task executes timed out, retrying in next interval")); + // The block deleting successfully and shouldn't catch timed + // out warning log. + Assert.assertTrue(!newLog.getOutput().contains( + "Background task executes timed out, retrying in next interval")); + } svc.shutdown(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 2fd169c03c3..f43caeeda2b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -47,7 +47,7 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -202,7 +202,7 @@ public class TestContainerPersistence { Path meta = kvData.getDbFile().toPath().getParent(); Assert.assertTrue(meta != null && Files.exists(meta)); - MetadataStore store = null; + ReferenceCountedDB store = null; try { store = BlockUtils.getDB(kvData, conf); Assert.assertNotNull(store); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 14db90d8cdb..cbb83eaadd0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -50,7 +50,7 @@ import org.apache.hadoop.ozone.ozShell.TestOzoneShell; import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -300,9 +300,12 @@ public class TestBlockDeletion { cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet(); OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { - MetadataStore db = BlockUtils.getDB((KeyValueContainerData) dnContainerSet - .getContainer(blockID.getContainerID()).getContainerData(), conf); - Assert.assertNotNull(db.get(Longs.toByteArray(blockID.getLocalID()))); + try(ReferenceCountedDB db = + BlockUtils.getDB((KeyValueContainerData) dnContainerSet + .getContainer(blockID.getContainerID()).getContainerData(), conf)) { + Assert.assertNotNull(db.getStore().get( + Longs.toByteArray(blockID.getLocalID()))); + } }, omKeyLocationInfoGroups); } @@ -312,13 +315,16 @@ public class TestBlockDeletion { cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet(); OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { - MetadataStore db = BlockUtils.getDB((KeyValueContainerData) dnContainerSet - .getContainer(blockID.getContainerID()).getContainerData(), conf); - Assert.assertNull(db.get(Longs.toByteArray(blockID.getLocalID()))); - Assert.assertNull(db.get(DFSUtil.string2Bytes( - OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()))); - Assert.assertNotNull(DFSUtil - .string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID())); + try(ReferenceCountedDB db = + BlockUtils.getDB((KeyValueContainerData) dnContainerSet + .getContainer(blockID.getContainerID()).getContainerData(), conf)) { + Assert.assertNull(db.getStore().get( + Longs.toByteArray(blockID.getLocalID()))); + Assert.assertNull(db.getStore().get(DFSUtil.string2Bytes( + OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()))); + Assert.assertNotNull(DFSUtil.string2Bytes( + OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID())); + } containerIdsWithDeletedBlocks.add(blockID.getContainerID()); }, omKeyLocationInfoGroups); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index 4a86f440170..e384d71f609 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -226,7 +226,7 @@ public class TestCloseContainerByPipeline { List datanodes = pipeline.getNodes(); Assert.assertEquals(3, datanodes.size()); - List metadataStores = new ArrayList<>(datanodes.size()); + List metadataStores = new ArrayList<>(datanodes.size()); for (DatanodeDetails details : datanodes) { Assert.assertFalse(isContainerClosed(cluster, containerID, details)); //send the order to close the container @@ -237,8 +237,10 @@ public class TestCloseContainerByPipeline { Container dnContainer = cluster.getHddsDatanodes().get(index) .getDatanodeStateMachine().getContainer().getContainerSet() .getContainer(containerID); - metadataStores.add(BlockUtils.getDB((KeyValueContainerData) dnContainer - .getContainerData(), conf)); + try(ReferenceCountedDB store = BlockUtils.getDB( + (KeyValueContainerData) dnContainer.getContainerData(), conf)) { + metadataStores.add(store); + } } // There should be as many rocks db as the number of datanodes in pipeline.