HDDS-1449. JVM Exit in datanode while committing a key. Contributed by Mukul Kumar Singh. (#825)

This commit is contained in:
Mukul Kumar Singh 2019-05-22 17:18:40 +05:30 committed by bshashikant
parent 67f9a7b165
commit 2fc6f8599a
19 changed files with 688 additions and 575 deletions

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -198,52 +198,54 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
}
int newDeletionBlocks = 0;
MetadataStore containerDB = BlockUtils.getDB(containerData, conf);
for (Long blk : delTX.getLocalIDList()) {
BatchOperation batch = new BatchOperation();
byte[] blkBytes = Longs.toByteArray(blk);
byte[] blkInfo = containerDB.get(blkBytes);
if (blkInfo != null) {
byte[] deletingKeyBytes =
DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk);
byte[] deletedKeyBytes =
DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk);
if (containerDB.get(deletingKeyBytes) != null
|| containerDB.get(deletedKeyBytes) != null) {
LOG.debug(String.format(
"Ignoring delete for block %d in container %d."
+ " Entry already added.", blk, containerId));
continue;
try(ReferenceCountedDB containerDB =
BlockUtils.getDB(containerData, conf)) {
for (Long blk : delTX.getLocalIDList()) {
BatchOperation batch = new BatchOperation();
byte[] blkBytes = Longs.toByteArray(blk);
byte[] blkInfo = containerDB.getStore().get(blkBytes);
if (blkInfo != null) {
byte[] deletingKeyBytes =
DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk);
byte[] deletedKeyBytes =
DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk);
if (containerDB.getStore().get(deletingKeyBytes) != null
|| containerDB.getStore().get(deletedKeyBytes) != null) {
LOG.debug(String.format(
"Ignoring delete for block %d in container %d."
+ " Entry already added.", blk, containerId));
continue;
}
// Found the block in container db,
// use an atomic update to change its state to deleting.
batch.put(deletingKeyBytes, blkInfo);
batch.delete(blkBytes);
try {
containerDB.getStore().writeBatch(batch);
newDeletionBlocks++;
LOG.debug("Transited Block {} to DELETING state in container {}",
blk, containerId);
} catch (IOException e) {
// if some blocks failed to delete, we fail this TX,
// without sending this ACK to SCM, SCM will resend the TX
// with a certain number of retries.
throw new IOException(
"Failed to delete blocks for TXID = " + delTX.getTxID(), e);
}
} else {
LOG.debug("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
}
// Found the block in container db,
// use an atomic update to change its state to deleting.
batch.put(deletingKeyBytes, blkInfo);
batch.delete(blkBytes);
try {
containerDB.writeBatch(batch);
newDeletionBlocks++;
LOG.debug("Transited Block {} to DELETING state in container {}",
blk, containerId);
} catch (IOException e) {
// if some blocks failed to delete, we fail this TX,
// without sending this ACK to SCM, SCM will resend the TX
// with a certain number of retries.
throw new IOException(
"Failed to delete blocks for TXID = " + delTX.getTxID(), e);
}
} else {
LOG.debug("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
}
}
containerDB
.put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
Longs.toByteArray(delTX.getTxID()));
containerData
.updateDeleteTransactionId(delTX.getTxID());
// update pending deletion blocks count in in-memory container status
containerData.incrPendingDeletionBlocks(newDeletionBlocks);
containerDB.getStore()
.put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
Longs.toByteArray(delTX.getTxID()));
containerData
.updateDeleteTransactionId(delTX.getTxID());
// update pending deletion blocks count in in-memory container status
containerData.incrPendingDeletionBlocks(newDeletionBlocks);
}
}
@Override

View File

@ -28,8 +28,11 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -92,8 +95,8 @@ public final class ContainerCache extends LRUMap {
MapIterator iterator = cache.mapIterator();
while (iterator.hasNext()) {
iterator.next();
MetadataStore db = (MetadataStore) iterator.getValue();
closeDB((String)iterator.getKey(), db);
ReferenceCountedDB db = (ReferenceCountedDB) iterator.getValue();
db.setEvicted(true);
}
// reset the cache
cache.clear();
@ -107,11 +110,11 @@ public final class ContainerCache extends LRUMap {
*/
@Override
protected boolean removeLRU(LinkEntry entry) {
MetadataStore db = (MetadataStore) entry.getValue();
ReferenceCountedDB db = (ReferenceCountedDB) entry.getValue();
String dbFile = (String)entry.getKey();
lock.lock();
try {
closeDB(dbFile, db);
db.setEvicted(false);
return true;
} catch (Exception e) {
LOG.error("Eviction for db:{} failed", dbFile, e);
@ -128,26 +131,30 @@ public final class ContainerCache extends LRUMap {
* @param containerDBType - DB type of the container.
* @param containerDBPath - DB path of the container.
* @param conf - Hadoop Configuration.
* @return MetadataStore.
* @return ReferenceCountedDB.
*/
public MetadataStore getDB(long containerID, String containerDBType,
public ReferenceCountedDB getDB(long containerID, String containerDBType,
String containerDBPath, Configuration conf)
throws IOException {
Preconditions.checkState(containerID >= 0,
"Container ID cannot be negative.");
lock.lock();
try {
MetadataStore db = (MetadataStore) this.get(containerDBPath);
ReferenceCountedDB db = (ReferenceCountedDB) this.get(containerDBPath);
if (db == null) {
db = MetadataStoreBuilder.newBuilder()
MetadataStore metadataStore =
MetadataStoreBuilder.newBuilder()
.setDbFile(new File(containerDBPath))
.setCreateIfMissing(false)
.setConf(conf)
.setDBType(containerDBType)
.build();
db = new ReferenceCountedDB(metadataStore, containerDBPath);
this.put(containerDBPath, db);
}
// increment the reference before returning the object
db.incrementReference();
return db;
} catch (Exception e) {
LOG.error("Error opening DB. Container:{} ContainerPath:{}",
@ -161,16 +168,70 @@ public final class ContainerCache extends LRUMap {
/**
* Remove a DB handler from cache.
*
* @param containerPath - path of the container db file.
* @param containerDBPath - path of the container db file.
*/
public void removeDB(String containerPath) {
public void removeDB(String containerDBPath) {
lock.lock();
try {
MetadataStore db = (MetadataStore)this.get(containerPath);
closeDB(containerPath, db);
this.remove(containerPath);
ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath);
if (db != null) {
// marking it as evicted will close the db as well.
db.setEvicted(true);
}
this.remove(containerDBPath);
} finally {
lock.unlock();
}
}
/**
* Class to implement reference counting over instances handed by Container
* Cache.
*/
public class ReferenceCountedDB implements Closeable {
private final AtomicInteger referenceCount;
private final AtomicBoolean isEvicted;
private final MetadataStore store;
private final String containerDBPath;
public ReferenceCountedDB(MetadataStore store, String containerDBPath) {
this.referenceCount = new AtomicInteger(0);
this.isEvicted = new AtomicBoolean(false);
this.store = store;
this.containerDBPath = containerDBPath;
}
private void incrementReference() {
this.referenceCount.incrementAndGet();
}
private void decrementReference() {
this.referenceCount.decrementAndGet();
cleanup();
}
private void setEvicted(boolean checkNoReferences) {
Preconditions.checkState(!checkNoReferences ||
(referenceCount.get() == 0),
"checkNoReferences:%b, referencount:%d",
checkNoReferences, referenceCount.get());
isEvicted.set(true);
cleanup();
}
private void cleanup() {
if (referenceCount.get() == 0 && isEvicted.get() && store != null) {
closeDB(containerDBPath, store);
}
}
public MetadataStore getStore() {
return store;
}
public void close() {
decrementReference();
}
}
}

View File

@ -31,11 +31,12 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
import org.apache.hadoop.utils.MetaStoreIterator;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.apache.hadoop.utils.MetadataStore.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.NoSuchElementException;
@ -48,12 +49,14 @@ import java.util.NoSuchElementException;
* {@link MetadataKeyFilters#getNormalKeyFilter()}
*/
@InterfaceAudience.Public
public class KeyValueBlockIterator implements BlockIterator<BlockData> {
public class KeyValueBlockIterator implements BlockIterator<BlockData>,
Closeable {
private static final Logger LOG = LoggerFactory.getLogger(
KeyValueBlockIterator.class);
private MetaStoreIterator<KeyValue> blockIterator;
private final ReferenceCountedDB db;
private static KeyPrefixFilter defaultBlockFilter = MetadataKeyFilters
.getNormalKeyFilter();
private KeyPrefixFilter blockFilter;
@ -91,9 +94,9 @@ public class KeyValueBlockIterator implements BlockIterator<BlockData> {
containerData;
keyValueContainerData.setDbFile(KeyValueContainerLocationUtil
.getContainerDBFile(metdataPath, containerId));
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, new
db = BlockUtils.getDB(keyValueContainerData, new
OzoneConfiguration());
blockIterator = metadataStore.iterator();
blockIterator = db.getStore().iterator();
blockFilter = filter;
}
@ -145,4 +148,8 @@ public class KeyValueBlockIterator implements BlockIterator<BlockData> {
nextBlock = null;
blockIterator.seekToLast();
}
public void close() {
db.close();
}
}

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers
.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.utils.MetadataStore;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
@ -74,6 +73,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNSUPPORTED_REQUEST;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -349,11 +349,12 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
void compactDB() throws StorageContainerException {
try {
MetadataStore db = BlockUtils.getDB(containerData, config);
db.compactDB();
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
db.getStore().compactDB();
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
}
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {

View File

@ -30,12 +30,12 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.utils.MetadataStore;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -236,41 +236,42 @@ public class KeyValueContainerCheck {
onDiskContainerData.setDbFile(dbFile);
MetadataStore db = BlockUtils
.getDB(onDiskContainerData, checkConfig);
iterateBlockDB(db);
try(ReferenceCountedDB db =
BlockUtils.getDB(onDiskContainerData, checkConfig)) {
iterateBlockDB(db);
}
}
private void iterateBlockDB(MetadataStore db)
private void iterateBlockDB(ReferenceCountedDB db)
throws IOException {
Preconditions.checkState(db != null);
// get "normal" keys from the Block DB
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(onDiskContainerData.getContainerPath()));
try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(onDiskContainerData.getContainerPath()))) {
// ensure there is a chunk file for each key in the DB
while (kvIter.hasNext()) {
BlockData block = kvIter.nextBlock();
// ensure there is a chunk file for each key in the DB
while (kvIter.hasNext()) {
BlockData block = kvIter.nextBlock();
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
File chunkFile;
chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
ChunkInfo.getFromProtoBuf(chunk));
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
File chunkFile;
chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
ChunkInfo.getFromProtoBuf(chunk));
if (!chunkFile.exists()) {
// concurrent mutation in Block DB? lookup the block again.
byte[] bdata = db.get(
Longs.toByteArray(block.getBlockID().getLocalID()));
if (bdata == null) {
LOG.trace("concurrency with delete, ignoring deleted block");
break; // skip to next block from kvIter
} else {
String errorStr = "Missing chunk file "
+ chunkFile.getAbsolutePath();
throw new IOException(errorStr);
if (!chunkFile.exists()) {
// concurrent mutation in Block DB? lookup the block again.
byte[] bdata = db.getStore().get(
Longs.toByteArray(block.getBlockID().getLocalID()));
if (bdata == null) {
LOG.trace("concurrency with delete, ignoring deleted block");
break; // skip to next block from kvIter
} else {
String errorStr = "Missing chunk file "
+ chunkFile.getAbsolutePath();
throw new IOException(errorStr);
}
}
}
}

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import java.io.IOException;
@ -66,7 +66,7 @@ public final class BlockUtils {
* @return MetadataStore handle.
* @throws StorageContainerException
*/
public static MetadataStore getDB(KeyValueContainerData containerData,
public static ReferenceCountedDB getDB(KeyValueContainerData containerData,
Configuration conf) throws
StorageContainerException {
Preconditions.checkNotNull(containerData);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -174,22 +175,25 @@ public final class KeyValueContainerUtil {
}
kvContainerData.setDbFile(dbFile);
MetadataStore metadata = BlockUtils.getDB(kvContainerData, config);
long bytesUsed = 0;
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
.getRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getNormalKeyFilter());
bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
BlockData blockData;
try {
blockData = BlockUtils.getBlockData(e.getValue());
return blockData.getSize();
} catch (IOException ex) {
return 0L;
}
}).sum();
kvContainerData.setBytesUsed(bytesUsed);
kvContainerData.setKeyCount(liveKeys.size());
try(ReferenceCountedDB metadata =
BlockUtils.getDB(kvContainerData, config)) {
long bytesUsed = 0;
List<Map.Entry<byte[], byte[]>> liveKeys = metadata.getStore()
.getRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getNormalKeyFilter());
bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
BlockData blockData;
try {
blockData = BlockUtils.getBlockData(e.getValue());
return blockData.getSize();
} catch (IOException ex) {
return 0L;
}
}).sum();
kvContainerData.setBytesUsed(bytesUsed);
kvContainerData.setKeyCount(liveKeys.size());
}
}
/**

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,47 +84,47 @@ public class BlockManagerImpl implements BlockManager {
"cannot be negative");
// We are not locking the key manager since LevelDb serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
MetadataStore db = BlockUtils.getDB((KeyValueContainerData) container
.getContainerData(), config);
try(ReferenceCountedDB db = BlockUtils.
getDB((KeyValueContainerData) container.getContainerData(), config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
long bcsId = data.getBlockCommitSequenceId();
long containerBCSId = ((KeyValueContainerData) container.
getContainerData()).getBlockCommitSequenceId();
long bcsId = data.getBlockCommitSequenceId();
long containerBCSId = ((KeyValueContainerData) container.getContainerData())
.getBlockCommitSequenceId();
// default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well
if (bcsId != 0) {
if (bcsId <= containerBCSId) {
// Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db.
// just log and return
LOG.warn("blockCommitSequenceId " + containerBCSId
+ " in the Container Db is greater than" + " the supplied value "
+ bcsId + " .Ignoring it");
return data.getSize();
// default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well
if (bcsId != 0) {
if (bcsId <= containerBCSId) {
// Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db.
// just log and return
LOG.warn("blockCommitSequenceId " + containerBCSId
+ " in the Container Db is greater than" + " the supplied value "
+ bcsId + " .Ignoring it");
return data.getSize();
}
}
// update the blockData as well as BlockCommitSequenceId here
BatchOperation batch = new BatchOperation();
batch.put(Longs.toByteArray(data.getLocalID()),
data.getProtoBufMessage().toByteArray());
batch.put(blockCommitSequenceIdKey,
Longs.toByteArray(bcsId));
db.getStore().writeBatch(batch);
container.updateBlockCommitSequenceId(bcsId);
// Increment keycount here
container.getContainerData().incrKeyCount();
LOG.debug(
"Block " + data.getBlockID() + " successfully committed with bcsId "
+ bcsId + " chunk size " + data.getChunks().size());
return data.getSize();
}
// update the blockData as well as BlockCommitSequenceId here
BatchOperation batch = new BatchOperation();
batch.put(Longs.toByteArray(data.getLocalID()),
data.getProtoBufMessage().toByteArray());
batch.put(blockCommitSequenceIdKey,
Longs.toByteArray(bcsId));
db.writeBatch(batch);
container.updateBlockCommitSequenceId(bcsId);
// Increment keycount here
container.getContainerData().incrKeyCount();
LOG.debug(
"Block " + data.getBlockID() + " successfully committed with bcsId "
+ bcsId + " chunk size " + data.getChunks().size());
return data.getSize();
}
/**
@ -146,32 +146,33 @@ public class BlockManagerImpl implements BlockManager {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = BlockUtils.getDB(containerData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
long containerBCSId = containerData.getBlockCommitSequenceId();
if (containerBCSId < bcsId) {
throw new StorageContainerException(
"Unable to find the block with bcsID " + bcsId + " .Container "
+ container.getContainerData().getContainerID() + " bcsId is "
+ containerBCSId + ".", UNKNOWN_BCSID);
long containerBCSId = containerData.getBlockCommitSequenceId();
if (containerBCSId < bcsId) {
throw new StorageContainerException(
"Unable to find the block with bcsID " + bcsId + " .Container "
+ container.getContainerData().getContainerID() + " bcsId is "
+ containerBCSId + ".", UNKNOWN_BCSID);
}
byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block." +
blockID, NO_SUCH_BLOCK);
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockID().getBlockCommitSequenceId();
if (id < bcsId) {
throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id "
+ id + " for block " + blockID + ".", BCSID_MISMATCH);
}
return BlockData.getFromProtoBuf(blockData);
}
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block." + blockID,
NO_SUCH_BLOCK);
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockID().getBlockCommitSequenceId();
if (id < bcsId) {
throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id "
+ id + " for block " + blockID + ".", BCSID_MISMATCH);
}
return BlockData.getFromProtoBuf(blockData);
}
/**
@ -187,18 +188,19 @@ public class BlockManagerImpl implements BlockManager {
throws IOException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = BlockUtils.getDB(containerData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
return blockData.getSize();
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
return blockData.getSize();
}
/**
@ -218,24 +220,24 @@ public class BlockManagerImpl implements BlockManager {
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = BlockUtils.getDB(cData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// Note : There is a race condition here, since get and delete
// are not atomic. Leaving it here since the impact is refusing
// to delete a Block which might have just gotten inserted after
// the get check.
byte[] kKey = Longs.toByteArray(blockID.getLocalID());
byte[] kData = db.get(kKey);
if (kData == null) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// Note : There is a race condition here, since get and delete
// are not atomic. Leaving it here since the impact is refusing
// to delete a Block which might have just gotten inserted after
// the get check.
byte[] kKey = Longs.toByteArray(blockID.getLocalID());
try {
db.getStore().delete(kKey);
} catch (IOException e) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
// Decrement blockcount here
container.getContainerData().decrKeyCount();
}
db.delete(kKey);
// Decrement blockcount here
container.getContainerData().decrKeyCount();
}
/**
@ -258,18 +260,19 @@ public class BlockManagerImpl implements BlockManager {
List<BlockData> result = null;
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = BlockUtils.getDB(cData, config);
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range =
db.getSequentialRangeKVs(startKeyInBytes, count,
MetadataKeyFilters.getNormalKeyFilter());
for (Map.Entry<byte[], byte[]> entry : range) {
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range =
db.getStore().getSequentialRangeKVs(startKeyInBytes, count,
MetadataKeyFilters.getNormalKeyFilter());
for (Map.Entry<byte[], byte[]> entry : range) {
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
}
return result;
}
return result;
}
/**

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTaskResult;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -185,69 +185,71 @@ public class BlockDeletingService extends BackgroundService{
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData, conf);
// # of blocks to delete is throttled
KeyPrefixFilter filter =
new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
}
try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
// # of blocks to delete is throttled
KeyPrefixFilter filter =
new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
meta.getStore().getSequentialRangeKVs(null, blockLimitPerTask,
filter);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
}
List<String> succeedBlocks = new LinkedList<>();
LOG.debug("Container : {}, To-Delete blocks : {}",
containerData.getContainerID(), toDeleteBlocks.size());
File dataDir = new File(containerData.getChunksPath());
if (!dataDir.exists() || !dataDir.isDirectory()) {
LOG.error("Invalid container data dir {} : "
+ "does not exist or not a directory", dataDir.getAbsolutePath());
List<String> succeedBlocks = new LinkedList<>();
LOG.debug("Container : {}, To-Delete blocks : {}",
containerData.getContainerID(), toDeleteBlocks.size());
File dataDir = new File(containerData.getChunksPath());
if (!dataDir.exists() || !dataDir.isDirectory()) {
LOG.error("Invalid container data dir {} : "
+ "does not exist or not a directory", dataDir.getAbsolutePath());
return crr;
}
toDeleteBlocks.forEach(entry -> {
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.debug("Deleting block {}", blockName);
try {
ContainerProtos.BlockData data =
ContainerProtos.BlockData.parseFrom(entry.getValue());
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
File chunkFile = dataDir.toPath()
.resolve(chunkInfo.getChunkName()).toFile();
if (FileUtils.deleteQuietly(chunkFile)) {
LOG.debug("block {} chunk {} deleted", blockName,
chunkFile.getAbsolutePath());
}
}
succeedBlocks.add(blockName);
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to parse block info for block {}", blockName, e);
}
});
// Once files are deleted... replace deleting entries with deleted
// entries
BatchOperation batch = new BatchOperation();
succeedBlocks.forEach(entry -> {
String blockId =
entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
batch.put(DFSUtil.string2Bytes(deletedEntry),
DFSUtil.string2Bytes(blockId));
batch.delete(DFSUtil.string2Bytes(entry));
});
meta.getStore().writeBatch(batch);
// update count of pending deletion blocks in in-memory container status
containerData.decrPendingDeletionBlocks(succeedBlocks.size());
if (!succeedBlocks.isEmpty()) {
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
containerData.getContainerID(), succeedBlocks.size(),
Time.monotonicNow() - startTime);
}
crr.addAll(succeedBlocks);
return crr;
}
toDeleteBlocks.forEach(entry -> {
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.debug("Deleting block {}", blockName);
try {
ContainerProtos.BlockData data =
ContainerProtos.BlockData.parseFrom(entry.getValue());
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
File chunkFile = dataDir.toPath()
.resolve(chunkInfo.getChunkName()).toFile();
if (FileUtils.deleteQuietly(chunkFile)) {
LOG.debug("block {} chunk {} deleted", blockName,
chunkFile.getAbsolutePath());
}
}
succeedBlocks.add(blockName);
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to parse block info for block {}", blockName, e);
}
});
// Once files are deleted... replace deleting entries with deleted entries
BatchOperation batch = new BatchOperation();
succeedBlocks.forEach(entry -> {
String blockId =
entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
batch.put(DFSUtil.string2Bytes(deletedEntry),
DFSUtil.string2Bytes(blockId));
batch.delete(DFSUtil.string2Bytes(entry));
});
meta.writeBatch(batch);
// update count of pending deletion blocks in in-memory container status
containerData.decrPendingDeletionBlocks(succeedBlocks.size());
if (!succeedBlocks.isEmpty()) {
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
containerData.getContainerID(), succeedBlocks.size(),
Time.monotonicNow() - startTime);
}
crr.addAll(succeedBlocks);
return crr;
}
@Override

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -180,28 +180,31 @@ public class ContainerReader implements Runnable {
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(
kvContainerData, config);
MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config);
MetadataKeyFilters.KeyPrefixFilter filter =
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX);
int numPendingDeletionBlocks =
containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
.size();
kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
byte[] delTxnId = containerDB.get(
DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
if (delTxnId != null) {
kvContainerData
.updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
try(ReferenceCountedDB containerDB = BlockUtils.getDB(kvContainerData,
config)) {
MetadataKeyFilters.KeyPrefixFilter filter =
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX);
int numPendingDeletionBlocks =
containerDB.getStore().getSequentialRangeKVs(null,
Integer.MAX_VALUE, filter)
.size();
kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
byte[] delTxnId = containerDB.getStore().get(
DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
if (delTxnId != null) {
kvContainerData
.updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
}
// sets the BlockCommitSequenceId.
byte[] bcsId = containerDB.getStore().get(DFSUtil.string2Bytes(
OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX));
if (bcsId != null) {
kvContainerData
.updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
}
containerSet.addContainer(kvContainer);
}
// sets the BlockCommitSequenceId.
byte[] bcsId = containerDB.get(
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX));
if (bcsId != null) {
kvContainerData
.updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
}
containerSet.addContainer(kvContainer);
} else {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -109,30 +109,31 @@ public class TestKeyValueBlockIterator {
createContainerWithBlocks(containerID, normalBlocks, deletedBlocks);
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath));
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath))) {
int counter = 0;
while(keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
int counter = 0;
while (keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
assertFalse(keyValueBlockIterator.hasNext());
assertFalse(keyValueBlockIterator.hasNext());
keyValueBlockIterator.seekToFirst();
counter = 0;
while(keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
assertFalse(keyValueBlockIterator.hasNext());
keyValueBlockIterator.seekToFirst();
counter = 0;
while (keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
assertFalse(keyValueBlockIterator.hasNext());
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
}
}
}
@ -142,17 +143,18 @@ public class TestKeyValueBlockIterator {
createContainerWithBlocks(containerID, 2, 0);
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath));
long blockID = 0L;
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath))) {
long blockID = 0L;
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
}
}
}
@ -162,42 +164,41 @@ public class TestKeyValueBlockIterator {
createContainerWithBlocks(containerID, 2, 0);
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath));
long blockID = 0L;
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath))) {
long blockID = 0L;
// Even calling multiple times hasNext() should not move entry forward.
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
// Even calling multiple times hasNext() should not move entry forward.
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
keyValueBlockIterator.seekToLast();
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
keyValueBlockIterator.seekToLast();
assertTrue(keyValueBlockIterator.hasNext());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
keyValueBlockIterator.seekToFirst();
blockID = 0L;
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
keyValueBlockIterator.seekToFirst();
blockID = 0L;
assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
try {
keyValueBlockIterator.nextBlock();
} catch (NoSuchElementException ex) {
GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
"for ContainerID " + containerID, ex);
}
}
}
@Test
@ -208,14 +209,15 @@ public class TestKeyValueBlockIterator {
createContainerWithBlocks(containerId, normalBlocks, deletedBlocks);
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerId, new File(containerPath), MetadataKeyFilters
.getDeletingKeyFilter());
.getDeletingKeyFilter())) {
int counter = 5;
while(keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
int counter = 5;
while (keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
}
}
@ -226,11 +228,12 @@ public class TestKeyValueBlockIterator {
createContainerWithBlocks(containerId, 0, 5);
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerId, new File(containerPath));
//As all blocks are deleted blocks, blocks does not match with normal key
// filter.
assertFalse(keyValueBlockIterator.hasNext());
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerId, new File(containerPath))) {
//As all blocks are deleted blocks, blocks does not match with normal key
// filter.
assertFalse(keyValueBlockIterator.hasNext());
}
}
/**
@ -251,27 +254,30 @@ public class TestKeyValueBlockIterator {
container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
.randomUUID().toString());
MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
try(ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
conf)) {
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024);
chunkList.add(info.getProtoBufMessage());
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024);
chunkList.add(info.getProtoBufMessage());
for (int i=0; i<normalBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
}
for (int i = 0; i < normalBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
blockData
.getProtoBufMessage().toByteArray());
}
for (int i=normalBlocks; i<deletedBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.put(DFSUtil.string2Bytes(OzoneConsts
.DELETING_KEY_PREFIX + blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
for (int i = normalBlocks; i < deletedBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.getStore().put(DFSUtil.string2Bytes(OzoneConsts
.DELETING_KEY_PREFIX + blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
}
}
}

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.Assert;
import org.junit.Before;
@ -132,23 +132,24 @@ public class TestKeyValueContainer {
private void addBlocks(int count) throws Exception {
long containerId = keyValueContainerData.getContainerID();
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainer
.getContainerData(), conf);
for (int i=0; i < count; i++) {
// Creating BlockData
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.addMetadata("VOLUME", "ozone");
blockData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
blockData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer
.getContainerData(), conf)) {
for (int i = 0; i < count; i++) {
// Creating BlockData
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
blockData.addMetadata("VOLUME", "ozone");
blockData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
blockData.setChunks(chunkList);
metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
blockData
.getProtoBufMessage().toByteArray());
}
}
}
@SuppressWarnings("RedundantCast")
@ -191,9 +192,12 @@ public class TestKeyValueContainer {
int numberOfKeysToWrite = 12;
//write one few keys to check the key count after import
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, conf);
for (int i = 0; i < numberOfKeysToWrite; i++) {
metadataStore.put(("test" + i).getBytes(UTF_8), "test".getBytes(UTF_8));
try(ReferenceCountedDB metadataStore =
BlockUtils.getDB(keyValueContainerData, conf)) {
for (int i = 0; i < numberOfKeysToWrite; i++) {
metadataStore.getStore().put(("test" + i).getBytes(UTF_8),
"test".getBytes(UTF_8));
}
}
BlockUtils.removeDB(keyValueContainerData, conf);

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingP
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -149,48 +149,50 @@ import static org.junit.Assert.assertTrue;
container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
UUID.randomUUID().toString());
MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
chunkManager = new ChunkManagerImpl(true);
try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
conf)) {
chunkManager = new ChunkManagerImpl(true);
assertTrue(containerData.getChunksPath() != null);
File chunksPath = new File(containerData.getChunksPath());
assertTrue(chunksPath.exists());
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
assertTrue(containerData.getChunksPath() != null);
File chunksPath = new File(containerData.getChunksPath());
assertTrue(chunksPath.exists());
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
for (int i = 0; i < (totalBlks); i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
for (int i = 0; i < (totalBlks); i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);
chunkList.clear();
for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
String chunkName = strBlock + i + strChunk + chunkCount;
long offset = chunkCount * chunkLen;
ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
chunkList.add(info.getProtoBufMessage());
chunkManager
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.build());
chunkManager
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.build());
}
blockData.setChunks(chunkList);
chunkList.clear();
for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
String chunkName = strBlock + i + strChunk + chunkCount;
long offset = chunkCount * chunkLen;
ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
chunkList.add(info.getProtoBufMessage());
chunkManager
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.build());
chunkManager
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.build());
}
blockData.setChunks(chunkList);
if (i >= normalBlocks) {
// deleted key
metadataStore.put(DFSUtil.string2Bytes(
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
blockData.getProtoBufMessage().toByteArray());
} else {
// normal key
metadataStore.put(Longs.toByteArray(blockID.getLocalID()),
blockData.getProtoBufMessage().toByteArray());
if (i >= normalBlocks) {
// deleted key
metadataStore.getStore().put(DFSUtil.string2Bytes(
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
blockData.getProtoBufMessage().toByteArray());
} else {
// normal key
metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
blockData.getProtoBufMessage().toByteArray());
}
}
}
}

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import java.io.IOException;
import java.io.OutputStream;
@ -119,16 +119,17 @@ public class TestStorageContainerManagerHelper {
public List<String> getPendingDeletionBlocks(Long containerID)
throws IOException {
List<String> pendingDeletionBlocks = Lists.newArrayList();
MetadataStore meta = getContainerMetadata(containerID);
ReferenceCountedDB meta = getContainerMetadata(containerID);
KeyPrefixFilter filter =
new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs = meta
List<Map.Entry<byte[], byte[]>> kvs = meta.getStore()
.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
String key = DFSUtil.bytes2String(entry.getKey());
pendingDeletionBlocks
.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
});
meta.close();
return pendingDeletionBlocks;
}
@ -143,17 +144,18 @@ public class TestStorageContainerManagerHelper {
public List<Long> getAllBlocks(Long containeID) throws IOException {
List<Long> allBlocks = Lists.newArrayList();
MetadataStore meta = getContainerMetadata(containeID);
ReferenceCountedDB meta = getContainerMetadata(containeID);
List<Map.Entry<byte[], byte[]>> kvs =
meta.getRangeKVs(null, Integer.MAX_VALUE,
meta.getStore().getRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getNormalKeyFilter());
kvs.forEach(entry -> {
allBlocks.add(Longs.fromByteArray(entry.getKey()));
});
meta.close();
return allBlocks;
}
private MetadataStore getContainerMetadata(Long containerID)
private ReferenceCountedDB getContainerMetadata(Long containerID)
throws IOException {
ContainerWithPipeline containerWithPipeline = cluster
.getStorageContainerManager().getClientProtocolServer()

View File

@ -953,18 +953,19 @@ public abstract class TestOzoneRpcClientAbstract {
.getContainerData());
String containerPath = new File(containerData.getMetadataPath())
.getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath));
while (keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
long length = 0;
List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath))) {
while (keyValueBlockIterator.hasNext()) {
BlockData blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
long length = 0;
List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
}
Assert.assertEquals(length, keyValue.getBytes().length);
break;
}
Assert.assertEquals(length, keyValue.getBytes().length);
break;
}
}
}
@ -1115,31 +1116,32 @@ public abstract class TestOzoneRpcClientAbstract {
(KeyValueContainerData) container.getContainerData();
String containerPath =
new File(containerData.getMetadataPath()).getParent();
KeyValueBlockIterator keyValueBlockIterator =
new KeyValueBlockIterator(containerID, new File(containerPath));
try (KeyValueBlockIterator keyValueBlockIterator =
new KeyValueBlockIterator(containerID, new File(containerPath))) {
// Find the block corresponding to the key we put. We use the localID of
// the BlockData to identify out key.
BlockData blockData = null;
while (keyValueBlockIterator.hasNext()) {
blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
break;
// Find the block corresponding to the key we put. We use the localID of
// the BlockData to identify out key.
BlockData blockData = null;
while (keyValueBlockIterator.hasNext()) {
blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
break;
}
}
Assert.assertNotNull("Block not found", blockData);
// Get the location of the chunk file
String chunkName = blockData.getChunks().get(0).getChunkName();
String containreBaseDir =
container.getContainerData().getVolume().getHddsRootDir().getPath();
File chunksLocationPath = KeyValueContainerLocationUtil
.getChunksLocationPath(containreBaseDir, scmId, containerID);
File chunkFile = new File(chunksLocationPath, chunkName);
// Corrupt the contents of the chunk file
String newData = new String("corrupted data");
FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
}
Assert.assertNotNull("Block not found", blockData);
// Get the location of the chunk file
String chunkName = blockData.getChunks().get(0).getChunkName();
String containreBaseDir =
container.getContainerData().getVolume().getHddsRootDir().getPath();
File chunksLocationPath = KeyValueContainerLocationUtil
.getChunksLocationPath(containreBaseDir, scmId, containerID);
File chunkFile = new File(chunksLocationPath, chunkName);
// Corrupt the contents of the chunk file
String newData = new String("corrupted data");
FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
}
@Test

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@ -119,35 +119,36 @@ public class TestBlockDeletingService {
containerSet.addContainer(container);
data = (KeyValueContainerData) containerSet.getContainer(
containerID).getContainerData();
MetadataStore metadata = BlockUtils.getDB(data, conf);
for (int j = 0; j<numOfBlocksPerContainer; j++) {
BlockID blockID =
ContainerTestHelper.getTestBlockID(containerID);
String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
blockID.getLocalID();
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
for (int k = 0; k<numOfChunksPerBlock; k++) {
// offset doesn't matter here
String chunkName = blockID.getLocalID() + "_chunk_" + k;
File chunk = new File(data.getChunksPath(), chunkName);
FileUtils.writeStringToFile(chunk, "a chunk",
Charset.defaultCharset());
LOG.info("Creating file {}", chunk.getAbsolutePath());
// make sure file exists
Assert.assertTrue(chunk.isFile() && chunk.exists());
ContainerProtos.ChunkInfo info =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(chunk.getAbsolutePath())
.setLen(0)
.setOffset(0)
.setChecksumData(Checksum.getNoChecksumDataProto())
.build();
chunks.add(info);
try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
for (int j = 0; j < numOfBlocksPerContainer; j++) {
BlockID blockID =
ContainerTestHelper.getTestBlockID(containerID);
String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
blockID.getLocalID();
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
for (int k = 0; k < numOfChunksPerBlock; k++) {
// offset doesn't matter here
String chunkName = blockID.getLocalID() + "_chunk_" + k;
File chunk = new File(data.getChunksPath(), chunkName);
FileUtils.writeStringToFile(chunk, "a chunk",
Charset.defaultCharset());
LOG.info("Creating file {}", chunk.getAbsolutePath());
// make sure file exists
Assert.assertTrue(chunk.isFile() && chunk.exists());
ContainerProtos.ChunkInfo info =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(chunk.getAbsolutePath())
.setLen(0)
.setOffset(0)
.setChecksumData(Checksum.getNoChecksumDataProto())
.build();
chunks.add(info);
}
kd.setChunks(chunks);
metadata.getStore().put(DFSUtil.string2Bytes(deleteStateName),
kd.getProtoBufMessage().toByteArray());
}
kd.setChunks(chunks);
metadata.put(DFSUtil.string2Bytes(deleteStateName),
kd.getProtoBufMessage().toByteArray());
}
}
}
@ -166,17 +167,19 @@ public class TestBlockDeletingService {
* Get under deletion blocks count from DB,
* note this info is parsed from container.db.
*/
private int getUnderDeletionBlocksCount(MetadataStore meta)
private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
meta.getStore().getRangeKVs(null, 100,
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX));
return underDeletionBlocks.size();
}
private int getDeletedBlocksCount(MetadataStore db) throws IOException {
private int getDeletedBlocksCount(ReferenceCountedDB db) throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
db.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
db.getStore().getRangeKVs(null, 100,
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETED_KEY_PREFIX));
return underDeletionBlocks.size();
}
@ -202,37 +205,38 @@ public class TestBlockDeletingService {
containerSet.listContainer(0L, 1, containerData);
Assert.assertEquals(1, containerData.size());
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf);
Map<Long, Container> containerMap = containerSet.getContainerMapCopy();
// NOTE: this test assumes that all the container is KetValueContainer and
// have DeleteTransactionId in KetValueContainerData. If other
// types is going to be added, this test should be checked.
long transactionId = ((KeyValueContainerData)containerMap
.get(containerData.get(0).getContainerID()).getContainerData())
.getDeleteTransactionId();
try(ReferenceCountedDB meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf)) {
Map<Long, Container> containerMap = containerSet.getContainerMapCopy();
// NOTE: this test assumes that all the container is KetValueContainer and
// have DeleteTransactionId in KetValueContainerData. If other
// types is going to be added, this test should be checked.
long transactionId = ((KeyValueContainerData) containerMap
.get(containerData.get(0).getContainerID()).getContainerData())
.getDeleteTransactionId();
// Number of deleted blocks in container should be equal to 0 before
// block delete
Assert.assertEquals(0, transactionId);
// Number of deleted blocks in container should be equal to 0 before
// block delete
Assert.assertEquals(0, transactionId);
// Ensure there are 3 blocks under deletion and 0 deleted blocks
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(0, getDeletedBlocksCount(meta));
// Ensure there are 3 blocks under deletion and 0 deleted blocks
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(0, getDeletedBlocksCount(meta));
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(2, getDeletedBlocksCount(meta));
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(2, getDeletedBlocksCount(meta));
deleteAndWait(svc, 2);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
deleteAndWait(svc, 2);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
deleteAndWait(svc, 3);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
deleteAndWait(svc, 3);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
}
svc.shutdown();
}
@ -311,25 +315,26 @@ public class TestBlockDeletingService {
// get container meta data
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, 1, containerData);
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf);
try(ReferenceCountedDB meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf)) {
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
try {
if (getUnderDeletionBlocksCount(meta) == 0) {
return true;
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
try {
if (getUnderDeletionBlocksCount(meta) == 0) {
return true;
}
} catch (IOException ignored) {
}
} catch (IOException ignored) {
}
return false;
}, 1000, 100000);
newLog.stopCapturing();
return false;
}, 1000, 100000);
newLog.stopCapturing();
// The block deleting successfully and shouldn't catch timed
// out warning log.
Assert.assertTrue(!newLog.getOutput().contains(
"Background task executes timed out, retrying in next interval"));
// The block deleting successfully and shouldn't catch timed
// out warning log.
Assert.assertTrue(!newLog.getOutput().contains(
"Background task executes timed out, retrying in next interval"));
}
svc.shutdown();
}

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -202,7 +202,7 @@ public class TestContainerPersistence {
Path meta = kvData.getDbFile().toPath().getParent();
Assert.assertTrue(meta != null && Files.exists(meta));
MetadataStore store = null;
ReferenceCountedDB store = null;
try {
store = BlockUtils.getDB(kvData, conf);
Assert.assertNotNull(store);

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -300,9 +300,12 @@ public class TestBlockDeletion {
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
MetadataStore db = BlockUtils.getDB((KeyValueContainerData) dnContainerSet
.getContainer(blockID.getContainerID()).getContainerData(), conf);
Assert.assertNotNull(db.get(Longs.toByteArray(blockID.getLocalID())));
try(ReferenceCountedDB db =
BlockUtils.getDB((KeyValueContainerData) dnContainerSet
.getContainer(blockID.getContainerID()).getContainerData(), conf)) {
Assert.assertNotNull(db.getStore().get(
Longs.toByteArray(blockID.getLocalID())));
}
}, omKeyLocationInfoGroups);
}
@ -312,13 +315,16 @@ public class TestBlockDeletion {
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
MetadataStore db = BlockUtils.getDB((KeyValueContainerData) dnContainerSet
.getContainer(blockID.getContainerID()).getContainerData(), conf);
Assert.assertNull(db.get(Longs.toByteArray(blockID.getLocalID())));
Assert.assertNull(db.get(DFSUtil.string2Bytes(
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID())));
Assert.assertNotNull(DFSUtil
.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID()));
try(ReferenceCountedDB db =
BlockUtils.getDB((KeyValueContainerData) dnContainerSet
.getContainer(blockID.getContainerID()).getContainerData(), conf)) {
Assert.assertNull(db.getStore().get(
Longs.toByteArray(blockID.getLocalID())));
Assert.assertNull(db.getStore().get(DFSUtil.string2Bytes(
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID())));
Assert.assertNotNull(DFSUtil.string2Bytes(
OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID()));
}
containerIdsWithDeletedBlocks.add(blockID.getContainerID());
}, omKeyLocationInfoGroups);
}

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -226,7 +226,7 @@ public class TestCloseContainerByPipeline {
List<DatanodeDetails> datanodes = pipeline.getNodes();
Assert.assertEquals(3, datanodes.size());
List<MetadataStore> metadataStores = new ArrayList<>(datanodes.size());
List<ReferenceCountedDB> metadataStores = new ArrayList<>(datanodes.size());
for (DatanodeDetails details : datanodes) {
Assert.assertFalse(isContainerClosed(cluster, containerID, details));
//send the order to close the container
@ -237,8 +237,10 @@ public class TestCloseContainerByPipeline {
Container dnContainer = cluster.getHddsDatanodes().get(index)
.getDatanodeStateMachine().getContainer().getContainerSet()
.getContainer(containerID);
metadataStores.add(BlockUtils.getDB((KeyValueContainerData) dnContainer
.getContainerData(), conf));
try(ReferenceCountedDB store = BlockUtils.getDB(
(KeyValueContainerData) dnContainer.getContainerData(), conf)) {
metadataStores.add(store);
}
}
// There should be as many rocks db as the number of datanodes in pipeline.