HDDS-72. Add deleteTransactionId field in ContainerInfo. Contributed by Lokesh Jain.

This commit is contained in:
Xiaoyu Yao 2018-06-11 16:02:32 -07:00
parent 7c3dc39083
commit 23bfd9f7e4
25 changed files with 402 additions and 71 deletions

View File

@ -32,6 +32,8 @@
import java.io.IOException;
import java.util.Comparator;
import static java.lang.Math.max;
/**
* Class wraps ozone container info.
*/
@ -60,6 +62,7 @@ public class ContainerInfo
private long stateEnterTime;
private String owner;
private long containerID;
private long deleteTransactionId;
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
@ -68,7 +71,8 @@ public class ContainerInfo
long usedBytes,
long numberOfKeys,
long stateEnterTime,
String owner) {
String owner,
long deleteTransactionId) {
this.containerID = containerID;
this.pipeline = pipeline;
this.allocatedBytes = allocatedBytes;
@ -78,6 +82,7 @@ public class ContainerInfo
this.state = state;
this.stateEnterTime = stateEnterTime;
this.owner = owner;
this.deleteTransactionId = deleteTransactionId;
}
/**
@ -96,6 +101,7 @@ public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
builder.setStateEnterTime(info.getStateEnterTime());
builder.setOwner(info.getOwner());
builder.setContainerID(info.getContainerID());
builder.setDeleteTransactionId(info.getDeleteTransactionId());
return builder.build();
}
@ -141,6 +147,14 @@ public long getNumberOfKeys() {
return numberOfKeys;
}
public long getDeleteTransactionId() {
return deleteTransactionId;
}
public void updateDeleteTransactionId(long transactionId) {
deleteTransactionId = max(transactionId, deleteTransactionId);
}
public ContainerID containerID() {
return new ContainerID(getContainerID());
}
@ -174,6 +188,7 @@ public HddsProtos.SCMContainerInfo getProtobuf() {
builder.setState(state);
builder.setStateEnterTime(stateEnterTime);
builder.setContainerID(getContainerID());
builder.setDeleteTransactionId(deleteTransactionId);
if (getOwner() != null) {
builder.setOwner(getOwner());
@ -292,6 +307,7 @@ public static class Builder {
private long stateEnterTime;
private String owner;
private long containerID;
private long deleteTransactionId;
public Builder setContainerID(long id) {
Preconditions.checkState(id >= 0);
@ -334,10 +350,15 @@ public Builder setOwner(String containerOwner) {
return this;
}
public Builder setDeleteTransactionId(long deleteTransactionId) {
this.deleteTransactionId = deleteTransactionId;
return this;
}
public ContainerInfo build() {
return new
ContainerInfo(containerID, state, pipeline,
allocated, used, keys, stateEnterTime, owner);
ContainerInfo(containerID, state, pipeline, allocated,
used, keys, stateEnterTime, owner, deleteTransactionId);
}
}
}

View File

@ -114,6 +114,8 @@ public static Versioning getVersioning(boolean versioning) {
public static final String OZONE_HANDLER_LOCAL = "local";
public static final String DELETING_KEY_PREFIX = "#deleting#";
public static final String DELETED_KEY_PREFIX = "#deleted#";
public static final String DELETE_TRANSACTION_KEY_PREFIX = "#delTX#";
public static final String OPEN_KEY_PREFIX = "#open#";
public static final String OPEN_KEY_ID_DELIMINATOR = "#";

View File

@ -22,17 +22,27 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import java.util.ArrayList;
import java.util.List;
/**
* An utility class to filter levelDB keys.
*/
public final class MetadataKeyFilters {
private static KeyPrefixFilter deletingKeyFilter =
new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX);
private static KeyPrefixFilter deletedKeyFilter =
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETED_KEY_PREFIX);
private static KeyPrefixFilter normalKeyFilter =
new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX,
true);
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX, true)
.addFilter(OzoneConsts.DELETED_KEY_PREFIX, true)
.addFilter(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX, true);
private MetadataKeyFilters() {
}
@ -41,6 +51,10 @@ public static KeyPrefixFilter getDeletingKeyFilter() {
return deletingKeyFilter;
}
public static KeyPrefixFilter getDeletedKeyFilter() {
return deletedKeyFilter;
}
public static KeyPrefixFilter getNormalKeyFilter() {
return normalKeyFilter;
}
@ -73,37 +87,95 @@ default int getKeysHintedNum() {
*/
public static class KeyPrefixFilter implements MetadataKeyFilter {
private String keyPrefix = null;
private List<String> positivePrefixList = new ArrayList<>();
private List<String> negativePrefixList = new ArrayList<>();
private boolean atleastOnePositiveMatch;
private int keysScanned = 0;
private int keysHinted = 0;
private Boolean negative;
public KeyPrefixFilter(String keyPrefix) {
this(keyPrefix, false);
public KeyPrefixFilter() {}
/**
* KeyPrefixFilter constructor. It is made of positive and negative prefix
* list. PositivePrefixList is the list of prefixes which are accepted
* whereas negativePrefixList contains the list of prefixes which are
* rejected.
*
* @param atleastOnePositiveMatch if positive it requires key to be accepted
* by atleast one positive filter.
*/
public KeyPrefixFilter(boolean atleastOnePositiveMatch) {
this.atleastOnePositiveMatch = atleastOnePositiveMatch;
}
public KeyPrefixFilter(String keyPrefix, boolean negative) {
this.keyPrefix = keyPrefix;
this.negative = negative;
public KeyPrefixFilter addFilter(String keyPrefix) {
addFilter(keyPrefix, false);
return this;
}
public KeyPrefixFilter addFilter(String keyPrefix, boolean negative) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(keyPrefix),
"KeyPrefix is null or empty: " + keyPrefix);
// keyPrefix which needs to be added should not be prefix of any opposing
// filter already present. If keyPrefix is a negative filter it should not
// be a prefix of any positive filter. Nor should any opposing filter be
// a prefix of keyPrefix.
// For example if b0 is accepted b can not be rejected and
// if b is accepted b0 can not be rejected. If these scenarios need to be
// handled we need to add priorities.
if (negative) {
Preconditions.checkArgument(positivePrefixList.stream().noneMatch(
prefix -> prefix.startsWith(keyPrefix) || keyPrefix
.startsWith(prefix)),
"KeyPrefix: " + keyPrefix + " already accepted.");
this.negativePrefixList.add(keyPrefix);
} else {
Preconditions.checkArgument(negativePrefixList.stream().noneMatch(
prefix -> prefix.startsWith(keyPrefix) || keyPrefix
.startsWith(prefix)),
"KeyPrefix: " + keyPrefix + " already rejected.");
this.positivePrefixList.add(keyPrefix);
}
return this;
}
@Override
public boolean filterKey(byte[] preKey, byte[] currentKey,
byte[] nextKey) {
keysScanned++;
boolean accept = false;
if (Strings.isNullOrEmpty(keyPrefix)) {
accept = true;
} else {
byte [] prefixBytes = DFSUtil.string2Bytes(keyPrefix);
if (currentKey != null && prefixMatch(prefixBytes, currentKey)) {
keysHinted++;
accept = true;
} else {
accept = false;
}
if (currentKey == null) {
return false;
}
return (negative) ? !accept : accept;
boolean accept;
// There are no filters present
if (positivePrefixList.isEmpty() && negativePrefixList.isEmpty()) {
return true;
}
accept = !positivePrefixList.isEmpty() && positivePrefixList.stream()
.anyMatch(prefix -> {
byte[] prefixBytes = DFSUtil.string2Bytes(prefix);
return prefixMatch(prefixBytes, currentKey);
});
if (accept) {
keysHinted++;
return true;
} else if (atleastOnePositiveMatch) {
return false;
}
accept = !negativePrefixList.isEmpty() && negativePrefixList.stream()
.allMatch(prefix -> {
byte[] prefixBytes = DFSUtil.string2Bytes(prefix);
return !prefixMatch(prefixBytes, currentKey);
});
if (accept) {
keysHinted++;
return true;
}
return false;
}
@Override
@ -116,7 +188,7 @@ public int getKeysHintedNum() {
return keysHinted;
}
private boolean prefixMatch(byte[] prefix, byte[] key) {
private static boolean prefixMatch(byte[] prefix, byte[] key) {
Preconditions.checkNotNull(prefix);
Preconditions.checkNotNull(key);
if (key.length < prefix.length) {

View File

@ -146,6 +146,7 @@ message SCMContainerInfo {
required uint64 numberOfKeys = 6;
optional int64 stateEnterTime = 7;
required string owner = 8;
optional int64 deleteTransactionId = 9;
}
message GetScmInfoRequestProto {

View File

@ -275,7 +275,7 @@ public void testGetRangeKVs() throws IOException {
// Filter keys by prefix.
// It should returns all "b*" entries.
MetadataKeyFilter filter1 = new KeyPrefixFilter("b");
MetadataKeyFilter filter1 = new KeyPrefixFilter().addFilter("b");
result = store.getRangeKVs(null, 100, filter1);
Assert.assertEquals(10, result.size());
Assert.assertTrue(result.stream().allMatch(entry ->
@ -422,4 +422,63 @@ public void testBatchWrite() throws IOException {
Assert.assertEquals(8, count.get());
}
@Test
public void testKeyPrefixFilter() throws IOException {
List<Map.Entry<byte[], byte[]>> result = null;
RuntimeException exception = null;
try {
new KeyPrefixFilter().addFilter("b0", true).addFilter("b");
} catch (IllegalArgumentException e) {
exception = e;
}
Assert.assertTrue(
exception.getMessage().contains("KeyPrefix: b already rejected"));
try {
new KeyPrefixFilter().addFilter("b0").addFilter("b", true);
} catch (IllegalArgumentException e) {
exception = e;
}
Assert.assertTrue(
exception.getMessage().contains("KeyPrefix: b already accepted"));
try {
new KeyPrefixFilter().addFilter("b", true).addFilter("b0");
} catch (IllegalArgumentException e) {
exception = e;
}
Assert.assertTrue(
exception.getMessage().contains("KeyPrefix: b0 already rejected"));
try {
new KeyPrefixFilter().addFilter("b").addFilter("b0", true);
} catch (IllegalArgumentException e) {
exception = e;
}
Assert.assertTrue(
exception.getMessage().contains("KeyPrefix: b0 already accepted"));
MetadataKeyFilter filter1 = new KeyPrefixFilter(true)
.addFilter("a0")
.addFilter("a1")
.addFilter("b", true);
result = store.getRangeKVs(null, 100, filter1);
Assert.assertEquals(2, result.size());
Assert.assertTrue(result.stream()
.anyMatch(entry -> new String(entry.getKey()).startsWith("a0"))
&& result.stream()
.anyMatch(entry -> new String(entry.getKey()).startsWith("a1")));
filter1 = new KeyPrefixFilter(true).addFilter("b", true);
result = store.getRangeKVs(null, 100, filter1);
Assert.assertEquals(0, result.size());
filter1 = new KeyPrefixFilter().addFilter("b", true);
result = store.getRangeKVs(null, 100, filter1);
Assert.assertEquals(10, result.size());
Assert.assertTrue(result.stream()
.allMatch(entry -> new String(entry.getKey()).startsWith("a")));
}
}

View File

@ -33,6 +33,8 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.Math.max;
/**
* This class maintains the information about a container in the ozone world.
* <p>
@ -57,6 +59,7 @@ public class ContainerData {
* Number of pending deletion blocks in container.
*/
private int numPendingDeletionBlocks;
private long deleteTransactionId;
private AtomicLong readBytes;
private AtomicLong writeBytes;
private AtomicLong readCount;
@ -78,6 +81,7 @@ public ContainerData(long containerID,
this.containerID = containerID;
this.state = ContainerLifeCycleState.OPEN;
this.numPendingDeletionBlocks = 0;
this.deleteTransactionId = 0;
this.readCount = new AtomicLong(0L);
this.readBytes = new AtomicLong(0L);
this.writeCount = new AtomicLong(0L);
@ -101,6 +105,7 @@ public ContainerData(long containerID, Configuration conf,
this.containerID = containerID;
this.state = state;
this.numPendingDeletionBlocks = 0;
this.deleteTransactionId = 0;
this.readCount = new AtomicLong(0L);
this.readBytes = new AtomicLong(0L);
this.writeCount = new AtomicLong(0L);
@ -425,6 +430,22 @@ public int getNumPendingDeletionBlocks() {
return this.numPendingDeletionBlocks;
}
/**
* Sets deleteTransactionId to latest delete transactionId for the container.
*
* @param transactionId latest transactionId of the container.
*/
public void updateDeleteTransactionId(long transactionId) {
deleteTransactionId = max(transactionId, deleteTransactionId);
}
/**
* Return the latest deleteTransactionId of the container.
*/
public long getDeleteTransactionId() {
return deleteTransactionId;
}
/**
* Get the number of bytes read from the container.
* @return the number of bytes read from the container.

View File

@ -20,6 +20,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import static java.lang.Math.max;
/**
* Container Report iterates the closed containers and sends a container report
* to SCM.
@ -35,6 +37,7 @@ public class ContainerReport {
private long readBytes;
private long writeBytes;
private long containerID;
private long deleteTransactionId;
public long getContainerID() {
return containerID;
@ -63,6 +66,7 @@ public ContainerReport(long containerID, String finalhash) {
this.readBytes = 0L;
this.writeCount = 0L;
this.writeBytes = 0L;
this.deleteTransactionId = 0;
}
/**
@ -96,6 +100,9 @@ public static ContainerReport getFromProtoBuf(ContainerInfo info) {
if (info.hasWriteBytes()) {
report.setWriteBytes(info.getWriteBytes());
}
if (info.hasDeleteTransactionId()) {
report.updateDeleteTransactionId(info.getDeleteTransactionId());
}
report.setContainerID(info.getContainerID());
return report;
@ -186,6 +193,10 @@ public void setBytesUsed(long bytesUsed) {
this.bytesUsed = bytesUsed;
}
public void updateDeleteTransactionId(long transactionId) {
this.deleteTransactionId = max(transactionId, deleteTransactionId);
}
/**
* Gets a containerInfo protobuf message from ContainerReports.
*
@ -202,6 +213,7 @@ public ContainerInfo getProtoBufMessage() {
.setWriteBytes(this.getWriteBytes())
.setFinalhash(this.getFinalhash())
.setContainerID(this.getContainerID())
.setDeleteTransactionId(this.deleteTransactionId)
.build();
}
}

View File

@ -20,11 +20,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@ -246,12 +248,18 @@ private void readContainerInfo(String containerName)
}
containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
// Initialize pending deletion blocks count in in-memory
// container status.
// Initialize pending deletion blocks and deleted blocks count in
// in-memory containerData.
MetadataStore metadata = KeyUtils.getDB(containerData, conf);
List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
.getSequentialRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getDeletingKeyFilter());
byte[] transactionID = metadata.get(DFSUtil.string2Bytes(
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerID));
if (transactionID != null) {
containerData
.updateDeleteTransactionId(Longs.fromByteArray(transactionID));
}
containerData.incrPendingDeletionBlocks(underDeletionBlocks.size());
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
@ -908,7 +916,8 @@ public ContainerReportsProto getContainerReport() throws IOException {
.setWriteCount(container.getWriteCount())
.setReadBytes(container.getReadBytes())
.setWriteBytes(container.getWriteBytes())
.setState(getState(containerId));
.setState(getState(containerId))
.setDeleteTransactionId(container.getDeleteTransactionId());
crBuilder.addReports(ciBuilder.build());
}

View File

@ -175,8 +175,8 @@ public BackgroundTaskResult call() throws Exception {
// Scan container's db and get list of under deletion blocks
MetadataStore meta = KeyUtils.getDB(containerData, conf);
// # of blocks to delete is throttled
KeyPrefixFilter filter = new KeyPrefixFilter(
OzoneConsts.DELETING_KEY_PREFIX);
KeyPrefixFilter filter =
new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
if (toDeleteBlocks.isEmpty()) {
@ -214,10 +214,16 @@ public BackgroundTaskResult call() throws Exception {
}
});
// Once files are deleted ... clean up DB
// Once files are deleted... replace deleting entries with deleted entries
BatchOperation batch = new BatchOperation();
succeedBlocks.forEach(entry ->
batch.delete(DFSUtil.string2Bytes(entry)));
succeedBlocks.forEach(entry -> {
String blockId =
entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
batch.put(DFSUtil.string2Bytes(deletedEntry),
DFSUtil.string2Bytes(blockId));
batch.delete(DFSUtil.string2Bytes(entry));
});
meta.writeBatch(batch);
// update count of pending deletion blocks in in-memory container status
containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),

View File

@ -186,6 +186,9 @@ private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
LOG.debug("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
}
containerDB.put(DFSUtil.string2Bytes(
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
Longs.toByteArray(delTX.getTxID()));
}
// update pending deletion blocks count in in-memory container status

View File

@ -160,6 +160,7 @@ message ContainerInfo {
optional int64 writeBytes = 8;
optional string finalhash = 9;
optional hadoop.hdds.LifeCycleState state = 10;
optional int64 deleteTransactionId = 11;
}
/*

View File

@ -361,13 +361,10 @@ public void deleteBlocks(List<BlockID> blockIDs) throws IOException {
}
}
// We update SCM DB first, so if this step fails, we end up here,
// nothing gets into the delLog so no blocks will be accidentally
// removed. If we write the log first, once log is written, the
// async deleting service will start to scan and might be picking
// up some blocks to do real deletions, that might cause data loss.
try {
deletedBlockLog.addTransactions(containerBlocks);
Map<Long, Long> deleteTransactionsMap =
deletedBlockLog.addTransactions(containerBlocks);
containerManager.updateDeleteTransactionId(deleteTransactionsMap);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"

View File

@ -108,9 +108,10 @@ void addTransaction(long containerID, List<Long> blocks)
* number of containers) together (on success) or non (on failure).
*
* @param containerBlocksMap a map of containerBlocks.
* @return Mapping from containerId to latest transactionId for the container.
* @throws IOException
*/
void addTransactions(Map<Long, List<Long>> containerBlocksMap)
Map<Long, Long> addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException;
/**

View File

@ -36,6 +36,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -306,12 +307,15 @@ public int getNumOfValidTransactions() throws IOException {
* {@inheritDoc}
*
* @param containerBlocksMap a map of containerBlocks.
* @return Mapping from containerId to latest transactionId for the container.
* @throws IOException
*/
@Override
public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
public Map<Long, Long> addTransactions(
Map<Long, List<Long>> containerBlocksMap)
throws IOException {
BatchOperation batch = new BatchOperation();
Map<Long, Long> deleteTransactionsMap = new HashMap<>();
lock.lock();
try {
long currentLatestID = lastTxID;
@ -321,11 +325,13 @@ public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
byte[] key = Longs.toByteArray(currentLatestID);
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
entry.getKey(), entry.getValue());
deleteTransactionsMap.put(entry.getKey(), currentLatestID);
batch.put(key, tx.toByteArray());
}
lastTxID = currentLatestID;
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
deletedStore.writeBatch(batch);
return deleteTransactionsMap;
} finally {
lock.unlock();
}

View File

@ -341,6 +341,39 @@ public HddsProtos.LifeCycleState updateContainerState(
}
}
/**
* Update deleteTransactionId according to deleteTransactionMap.
*
* @param deleteTransactionMap Maps the containerId to latest delete
* transaction id for the container.
* @throws IOException
*/
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
lock.lock();
try {
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();
byte[] dbKey = Longs.toByteArray(containerID);
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes == null) {
throw new SCMException(
"Failed to increment number of deleted blocks for container "
+ containerID + ", reason : " + "container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
containerInfo.updateDeleteTransactionId(entry.getValue());
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
containerStateManager
.updateDeleteTransactionId(containerID, entry.getValue());
}
} finally {
lock.unlock();
}
}
/**
* Returns the container State Manager.
*
@ -441,6 +474,7 @@ private HddsProtos.SCMContainerInfo reconcileState(
builder.setState(knownState.getState());
builder.setStateEnterTime(knownState.getStateEnterTime());
builder.setContainerID(knownState.getContainerID());
builder.setDeleteTransactionId(knownState.getDeleteTransactionId());
if (knownState.getOwner() != null) {
builder.setOwner(knownState.getOwner());
}
@ -571,6 +605,7 @@ public void flushContainerInfo() throws IOException {
.setPipeline(oldInfo.getPipeline())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())
.build();
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {

View File

@ -304,6 +304,7 @@ public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.setDeleteTransactionId(0)
.build();
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
@ -351,6 +352,17 @@ public ContainerInfo updateContainerInfo(ContainerInfo info)
return containers.getContainerInfo(info);
}
/**
* Update deleteTransactionId for a container.
*
* @param containerID ContainerID of the container whose delete
* transactionId needs to be updated.
* @param transactionId latest transactionId to be updated for the container
*/
public void updateDeleteTransactionId(Long containerID, long transactionId) {
containers.getContainerMap().get(ContainerID.valueof(containerID))
.updateDeleteTransactionId(transactionId);
}
/**
* Return a container matching the attributes specified.

View File

@ -26,6 +26,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This is
@ -104,6 +105,16 @@ void processContainerReports(DatanodeDetails datanodeDetails,
ContainerReportsProto reports)
throws IOException;
/**
* Update deleteTransactionId according to deleteTransactionMap.
*
* @param deleteTransactionMap Maps the containerId to latest delete
* transaction id for the container.
* @throws IOException
*/
void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException;
/**
* Returns the nodeManager.
* @return NodeManager

View File

@ -109,8 +109,24 @@ public void testDeleteBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, containerOwner);
Assert.assertNotNull(block);
long transactionId =
mapping.getContainer(block.getBlockID().getContainerID())
.getDeleteTransactionId();
Assert.assertEquals(0, transactionId);
blockManager.deleteBlocks(Collections.singletonList(
block.getBlockID()));
Assert.assertEquals(++transactionId,
mapping.getContainer(block.getBlockID().getContainerID())
.getDeleteTransactionId());
block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, containerOwner);
Assert.assertNotNull(block);
blockManager.deleteBlocks(Collections.singletonList(
block.getBlockID()));
Assert.assertEquals(++transactionId,
mapping.getContainer(block.getBlockID().getContainerID())
.getDeleteTransactionId());
}
@Test

View File

@ -203,7 +203,8 @@ public void testFullContainerReport() throws IOException {
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setContainerID(info.getContainerID());
.setContainerID(info.getContainerID())
.setDeleteTransactionId(0);
reports.add(ciBuilder.build());
@ -237,7 +238,8 @@ public void testContainerCloseWithContainerReport() throws IOException {
.setWriteCount(500000000L)
.setReadBytes(5368705120L)
.setWriteBytes(5368705120L)
.setContainerID(info.getContainerID());
.setContainerID(info.getContainerID())
.setDeleteTransactionId(0);
reports.add(ciBuilder.build());

View File

@ -212,7 +212,8 @@ private void sendContainerReport(ContainerInfo info, long used) throws
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L);
.setWriteBytes(2000000000L)
.setDeleteTransactionId(0);
reports.addReports(ciBuilder);
mapping.processContainerReports(TestUtils.getDatanodeDetails(),
reports.build());

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
@ -120,7 +121,7 @@ public List<String> getPendingDeletionBlocks(Long containerID)
List<String> pendingDeletionBlocks = Lists.newArrayList();
MetadataStore meta = getContainerMetadata(containerID);
KeyPrefixFilter filter =
new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs = meta
.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
@ -147,7 +148,8 @@ public List<Long> getAllBlocks(Long containeID) throws IOException {
(preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey)
.startsWith(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs =
meta.getRangeKVs(null, Integer.MAX_VALUE, filter);
meta.getRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getNormalKeyFilter());
kvs.forEach(entry -> {
allBlocks.add(Longs.fromByteArray(entry.getKey()));
});

View File

@ -66,8 +66,6 @@
.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.container
.ContainerTestHelper.createSingleNodePipeline;
/**
* Tests to test block deleting service.
@ -183,8 +181,15 @@ private void deleteAndWait(BlockDeletingServiceTestImpl service,
private int getUnderDeletionBlocksCount(MetadataStore meta)
throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter(
OzoneConsts.DELETING_KEY_PREFIX));
meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX));
return underDeletionBlocks.size();
}
private int getDeletedBlocksCount(MetadataStore db) throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
db.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETED_KEY_PREFIX));
return underDeletionBlocks.size();
}
@ -205,20 +210,34 @@ public void testBlockDeletion() throws Exception {
List<ContainerData> containerData = Lists.newArrayList();
containerManager.listContainer(0L, 1, containerData);
Assert.assertEquals(1, containerData.size());
MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
// Ensure there is 100 blocks under deletion
MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
Map<Long, ContainerData> containerMap =
((ContainerManagerImpl) containerManager).getContainerMap();
long transactionId =
containerMap.get(containerData.get(0).getContainerID())
.getDeleteTransactionId();
// Number of deleted blocks in container should be equal to 0 before
// block delete
Assert.assertEquals(0, transactionId);
// Ensure there are 3 blocks under deletion and 0 deleted blocks
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(0, getDeletedBlocksCount(meta));
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(2, getDeletedBlocksCount(meta));
deleteAndWait(svc, 2);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
deleteAndWait(svc, 3);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
Assert.assertEquals(3, getDeletedBlocksCount(meta));
svc.shutdown();
shutdownContainerMangaer(containerManager);

View File

@ -638,7 +638,8 @@ public void testDeleteKey() throws IOException, OzoneException {
MetadataStore store = cluster.getKeySpaceManager().
getMetadataManager().getStore();
List<Map.Entry<byte[], byte[]>> list = store.getRangeKVs(null, 10,
new MetadataKeyFilters.KeyPrefixFilter(DELETING_KEY_PREFIX));
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(DELETING_KEY_PREFIX));
Assert.assertEquals(1, list.size());
// Delete the key again to test deleting non-existing key.

View File

@ -352,8 +352,8 @@ public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
MetadataKeyFilter filter = new KeyPrefixFilter(
getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
MetadataKeyFilter filter = new KeyPrefixFilter()
.addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
List<Map.Entry<byte[], byte[]>> rangeResult;
if (!Strings.isNullOrEmpty(startKey)) {
@ -449,7 +449,8 @@ private VolumeList getVolumesByUser(byte[] userNameKey)
private VolumeList getAllVolumes() throws IOException {
// Scan all users in database
KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX);
KeyPrefixFilter filter =
new KeyPrefixFilter().addFilter(OzoneConsts.KSM_USER_PREFIX);
// We are not expecting a huge number of users per cluster,
// it should be fine to scan all users in db and return us a
// list of volume names in string per user.
@ -497,7 +498,7 @@ public List<BlockGroup> getExpiredOpenKeys() throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
long now = Time.now();
final MetadataKeyFilter openKeyFilter =
new KeyPrefixFilter(OPEN_KEY_PREFIX);
new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
openKeyFilter);

View File

@ -64,9 +64,14 @@ public void initialize() throws IOException {
.setPipeline(pipeline)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0).setUsedBytes(0).setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow()).setOwner("OZONE")
.setContainerID(x).build();
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(x)
.setDeleteTransactionId(0)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
} catch (SCMException e) {
@ -80,9 +85,14 @@ public void initialize() throws IOException {
.setPipeline(pipeline)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0).setUsedBytes(0).setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow()).setOwner("OZONE")
.setContainerID(y).build();
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(y)
.setDeleteTransactionId(0)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
} catch (SCMException e) {
@ -95,9 +105,14 @@ public void initialize() throws IOException {
.setPipeline(pipeline)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0).setUsedBytes(0).setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow()).setOwner("OZONE")
.setContainerID(currentCount++).build();
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(currentCount++)
.setDeleteTransactionId(0)
.build();
stateMap.addContainer(containerInfo);
} catch (SCMException e) {
e.printStackTrace();
@ -155,9 +170,14 @@ public void createContainerBenchMark(BenchMarkContainerStateMap state,
.setPipeline(pipeline)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0).setUsedBytes(0).setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow()).setOwner("OZONE")
.setContainerID(cid).build();
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(cid)
.setDeleteTransactionId(0)
.build();
state.stateMap.addContainer(containerInfo);
}