HDFS-12443. Ozone: Improve SCM block deletion throttling algorithm. Contributed by Yiqun Lin.

This commit is contained in:
Yiqun Lin 2017-11-06 20:21:51 +08:00 committed by Owen O'Malley
parent 7ebe79e879
commit 132f30c8ea
10 changed files with 470 additions and 171 deletions

View File

@ -165,7 +165,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
try {
containerDB.writeBatch(batch);
newDeletionBlocks++;
LOG.info("Transited Block {} to DELETING state in container {}",
LOG.debug("Transited Block {} to DELETING state in container {}",
blk, containerId);
} catch (IOException e) {
// if some blocks failed to delete, we fail this TX,
@ -175,7 +175,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
"Failed to delete blocks for TXID = " + delTX.getTxID(), e);
}
} else {
LOG.info("Block {} not found or already under deletion in"
LOG.debug("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
}
}

View File

@ -811,7 +811,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
+ "success={}", result.getTxID(), result.getSuccess());
}
if (result.getSuccess()) {
LOG.info("Purging TXID={} from block deletion log", result.getTxID());
LOG.debug("Purging TXID={} from block deletion log",
result.getTxID());
this.getScmBlockManager().getDeletedBlockLog()
.commitTransactions(Collections.singletonList(result.getTxID()));
} else {

View File

@ -78,4 +78,9 @@ public interface BlockManager extends Closeable {
* @throws IOException
*/
void stop() throws IOException;
/**
* @return the block deleting service executed in SCM.
*/
SCMBlockDeletingService getSCMBlockDeletingService();
}

View File

@ -148,7 +148,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
blockDeletingService =
new SCMBlockDeletingService(
deletedBlockLog, containerManager, nodeManager, svcInterval,
serviceTimeout);
serviceTimeout, conf);
}
/**
@ -525,4 +525,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// factor. Hence returning 0 for now.
// containers.get(OzoneProtos.LifeCycleState.OPEN).size();
}
@Override
public SCMBlockDeletingService getSCMBlockDeletingService() {
return this.blockDeletingService;
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import com.google.common.collect.ArrayListMultimap;
/**
* A wrapper class to hold info about datanode and all deleted block
* transactions that will be sent to this datanode.
*/
public class DatanodeDeletedBlockTransactions {
private int nodeNum;
// The throttle size for each datanode.
private int maximumAllowedTXNum;
// Current counter of inserted TX.
private int currentTXNum;
private Mapping mappingService;
// A list of TXs mapped to a certain datanode ID.
private final ArrayListMultimap<DatanodeID, DeletedBlocksTransaction>
transactions;
DatanodeDeletedBlockTransactions(Mapping mappingService,
int maximumAllowedTXNum, int nodeNum) {
this.transactions = ArrayListMultimap.create();
this.mappingService = mappingService;
this.maximumAllowedTXNum = maximumAllowedTXNum;
this.nodeNum = nodeNum;
}
public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
ContainerInfo info = null;
try {
info = mappingService.getContainer(tx.getContainerName());
} catch (IOException e) {
SCMBlockDeletingService.LOG.warn("Got container info error.", e);
}
if (info == null) {
SCMBlockDeletingService.LOG.warn(
"Container {} not found, continue to process next",
tx.getContainerName());
return;
}
for (DatanodeID dnID : info.getPipeline().getMachines()) {
if (transactions.containsKey(dnID)) {
List<DeletedBlocksTransaction> txs = transactions.get(dnID);
if (txs != null && txs.size() < maximumAllowedTXNum) {
boolean hasContained = false;
for (DeletedBlocksTransaction t : txs) {
if (t.getContainerName().equals(tx.getContainerName())) {
hasContained = true;
break;
}
}
if (!hasContained) {
txs.add(tx);
currentTXNum++;
}
}
} else {
currentTXNum++;
transactions.put(dnID, tx);
}
SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID,
tx.getTxID());
}
}
Set<DatanodeID> getDatanodes() {
return transactions.keySet();
}
boolean isEmpty() {
return transactions.isEmpty();
}
boolean hasTransactions(DatanodeID dnID) {
return transactions.containsKey(dnID) && !transactions.get(dnID).isEmpty();
}
List<DeletedBlocksTransaction> getDatanodeTransactions(
DatanodeID dnID) {
return transactions.get(dnID);
}
List<String> getTransactionIDList(DatanodeID dnID) {
if (hasTransactions(dnID)) {
return transactions.get(dnID).stream()
.map(DeletedBlocksTransaction::getTxID).map(String::valueOf)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
boolean isFull() {
return currentTXNum >= maximumAllowedTXNum * nodeNum;
}
int getTXNum() {
return currentTXNum;
}
}

View File

@ -45,6 +45,16 @@ public interface DeletedBlockLog extends Closeable {
List<DeletedBlocksTransaction> getTransactions(int count)
throws IOException;
/**
* Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
* Once DatanodeDeletedBlockTransactions is full, the scan behavior will
* stop.
* @param transactions a list of TXs will be set into.
* @throws IOException
*/
void getTransactions(DatanodeDeletedBlockTransactions transactions)
throws IOException;
/**
* Return all failed transactions in the log. A transaction is considered
* to be failed if it has been sent more than MAX_RETRY limit and its

View File

@ -326,4 +326,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
deletedStore.close();
}
}
@Override
public void getTransactions(DatanodeDeletedBlockTransactions transactions)
throws IOException {
lock.lock();
try {
deletedStore.iterate(null, (key, value) -> {
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(value);
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
transactions.addTransaction(block);
}
return !transactions.isFull();
}
return true;
});
} finally {
lock.unlock();
}
}
}

View File

@ -16,14 +16,16 @@
*/
package org.apache.hadoop.ozone.scm.block;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTask;
@ -32,13 +34,12 @@ import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* A background service running in SCM to delete blocks. This service scans
@ -49,7 +50,7 @@ import java.util.stream.Collectors;
*/
public class SCMBlockDeletingService extends BackgroundService {
private static final Logger LOG =
static final Logger LOG =
LoggerFactory.getLogger(SCMBlockDeletingService.class);
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
@ -58,28 +59,36 @@ public class SCMBlockDeletingService extends BackgroundService {
private final Mapping mappingService;
private final NodeManager nodeManager;
// Default container size is 5G and block size is 256MB, a full container
// at most contains 20 blocks. At most each TX contains 20 blocks.
// When SCM sends block deletion TXs to datanode, each command we allow
// at most 50 containers so that will limit number of to be deleted blocks
// less than 1000.
// TODO - a better throttle algorithm
// Note, this is not an accurate limit of blocks. When we scan
// the log, worst case we may get 50 TX for 50 different datanodes,
// that will cause the deletion message sent by SCM extremely small.
// As a result, the deletion will be slow. An improvement is to scan
// log multiple times until we get enough TXs for each datanode, or
// the entire log is scanned.
private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
// Block delete limit size is dynamically calculated based on container
// delete limit size (ozone.block.deleting.container.limit.per.interval)
// that configured for datanode. To ensure DN not wait for
// delete commands, we use this value multiply by a factor 2 as the final
// limit TX size for each node.
// Currently we implement a throttle algorithm that throttling delete blocks
// for each datanode. Each node is limited by the calculation size. Firstly
// current node info is fetched from nodemanager, then scan entire delLog
// from the beginning to end. If one node reaches maximum value, its records
// will be skipped. If not, keep scanning until it reaches maximum value.
// Once all node are full, the scan behavior will stop.
private int blockDeleteLimitSize;
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
Mapping mapper, NodeManager nodeManager,
int interval, long serviceTimeout) {
int interval, long serviceTimeout, Configuration conf) {
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
this.mappingService = mapper;
this.nodeManager = nodeManager;
int containerLimit = conf.getInt(
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
Preconditions.checkArgument(containerLimit > 0,
"Container limit size should be " + "positive.");
// Use container limit value multiply by a factor 2 to ensure DN
// not wait for orders.
this.blockDeleteLimitSize = containerLimit * 2;
}
@Override
@ -104,126 +113,60 @@ public class SCMBlockDeletingService extends BackgroundService {
// Scan SCM DB in HB interval and collect a throttled list of
// to delete blocks.
LOG.debug("Running DeletedBlockTransactionScanner");
DatanodeDeletedBlockTransactions transactions =
getToDeleteContainerBlocks();
DatanodeDeletedBlockTransactions transactions = null;
List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
if (datanodes != null) {
transactions = new DatanodeDeletedBlockTransactions(mappingService,
blockDeleteLimitSize, datanodes.size());
try {
deletedBlockLog.getTransactions(transactions);
} catch (IOException e) {
// We may tolerant a number of failures for sometime
// but if it continues to fail, at some point we need to raise
// an exception and probably fail the SCM ? At present, it simply
// continues to retry the scanning.
LOG.error("Failed to get block deletion transactions from delTX log",
e);
}
LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
transactions.getTXNum());
}
if (transactions != null && !transactions.isEmpty()) {
for (DatanodeID datanodeID : transactions.getDatanodes()) {
List<DeletedBlocksTransaction> dnTXs = transactions
.getDatanodeTransactions(datanodeID);
dnTxCount += dnTXs.size();
// TODO commandQueue needs a cap.
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
nodeManager.addDatanodeCommand(datanodeID,
new DeleteBlocksCommand(dnTXs));
LOG.debug(
"Added delete block command for datanode {} in the queue,"
+ " number of delete block transactions: {}, TxID list: {}",
datanodeID, dnTXs.size(),
String.join(",", transactions.getTransactionIDList(datanodeID)));
if (dnTXs != null && !dnTXs.isEmpty()) {
dnTxCount += dnTXs.size();
// TODO commandQueue needs a cap.
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
nodeManager.addDatanodeCommand(datanodeID,
new DeleteBlocksCommand(dnTXs));
LOG.debug(
"Added delete block command for datanode {} in the queue,"
+ " number of delete block transactions: {}, TxID list: {}",
datanodeID, dnTXs.size(), String.join(",",
transactions.getTransactionIDList(datanodeID)));
}
}
}
if (dnTxCount > 0) {
LOG.info("Totally added {} delete blocks command for"
+ " {} datanodes, task elapsed time: {}ms",
LOG.info(
"Totally added {} delete blocks command for"
+ " {} datanodes, task elapsed time: {}ms",
dnTxCount, transactions.getDatanodes().size(),
Time.monotonicNow() - startTime);
}
return EmptyTaskResult.newResult();
}
// Scan deleteBlocks.db to get a number of to-delete blocks.
// this is going to be properly throttled.
private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
DatanodeDeletedBlockTransactions dnTXs =
new DatanodeDeletedBlockTransactions();
List<DeletedBlocksTransaction> txs = null;
try {
// Get a limited number of TXs to send via HB at a time.
txs = deletedBlockLog
.getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
LOG.debug("Scanned deleted blocks log and got {} delTX to process",
txs.size());
} catch (IOException e) {
// We may tolerant a number of failures for sometime
// but if it continues to fail, at some point we need to raise
// an exception and probably fail the SCM ? At present, it simply
// continues to retry the scanning.
LOG.error("Failed to get block deletion transactions from delTX log",
e);
}
if (txs != null) {
for (DeletedBlocksTransaction tx : txs) {
try {
ContainerInfo info = mappingService
.getContainer(tx.getContainerName());
// Find out the datanode where this TX is supposed to send to.
info.getPipeline().getMachines()
.forEach(entry -> dnTXs.addTransaction(entry, tx));
} catch (IOException e) {
LOG.warn("Container {} not found, continue to process next",
tx.getContainerName(), e);
}
}
}
return dnTXs;
}
}
/**
* A wrapper class to hold info about datanode and all deleted block
* transactions that will be sent to this datanode.
*/
private static class DatanodeDeletedBlockTransactions {
// A list of TXs mapped to a certain datanode ID.
private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
DatanodeDeletedBlockTransactions() {
this.transactions = Maps.newHashMap();
}
void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
if (transactions.containsKey(dnID)) {
transactions.get(dnID).add(tx);
} else {
List<DeletedBlocksTransaction> first = Lists.newArrayList();
first.add(tx);
transactions.put(dnID, first);
}
LOG.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
}
Set<DatanodeID> getDatanodes() {
return transactions.keySet();
}
boolean isEmpty() {
return transactions.isEmpty();
}
boolean hasTransactions(DatanodeID dnID) {
return transactions.containsKey(dnID) &&
!transactions.get(dnID).isEmpty();
}
List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
return transactions.get(dnID);
}
List<String> getTransactionIDList(DatanodeID dnID) {
if (hasTransactions(dnID)) {
return transactions.get(dnID).stream()
.map(DeletedBlocksTransaction::getTxID)
.map(String::valueOf)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
@VisibleForTesting
public void setBlockDeleteTXNum(int numTXs) {
blockDeleteLimitSize = numTXs;
}
}

View File

@ -22,9 +22,17 @@ import java.io.IOException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.Rule;
@ -166,11 +174,17 @@ public class TestStorageContainerManager {
@Test
public void testBlockDeletionTransactions() throws Exception {
int numKeys = 5;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
// Reset container provision size, otherwise only one container
// is created by default.
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
numKeys);
MiniOzoneCluster cluster =
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
@ -180,48 +194,14 @@ public class TestStorageContainerManager {
.getScmBlockManager().getDeletedBlockLog();
Assert.assertEquals(0, delLog.getNumOfValidTransactions());
// Create 20 random names keys.
// Create {numKeys} random names keys.
TestStorageContainerManagerHelper helper =
new TestStorageContainerManagerHelper(cluster, conf);
Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
// These keys will be written into a bunch of containers,
// gets a set of container names, verify container containerBlocks
// on datanodes.
Set<String> containerNames = new HashSet<>();
for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
entry.getValue().getKeyLocationList()
.forEach(loc -> containerNames.add(loc.getContainerName()));
}
// Total number of containerBlocks of these containers should be equal to
// total number of containerBlocks via creation call.
int totalCreatedBlocks = 0;
for (KsmKeyInfo info : keyLocations.values()) {
totalCreatedBlocks += info.getKeyLocationList().size();
}
Assert.assertTrue(totalCreatedBlocks > 0);
Assert.assertEquals(totalCreatedBlocks,
helper.getAllBlocks(containerNames).size());
// Create a deletion TX for each key.
Map<String, List<String>> containerBlocks = Maps.newHashMap();
for (KsmKeyInfo info : keyLocations.values()) {
List<KsmKeyLocationInfo> list = info.getKeyLocationList();
list.forEach(location -> {
if (containerBlocks.containsKey(location.getContainerName())) {
containerBlocks.get(location.getContainerName())
.add(location.getBlockID());
} else {
List<String> blks = Lists.newArrayList();
blks.add(location.getBlockID());
containerBlocks.put(location.getContainerName(), blks);
}
});
}
for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
delLog.addTransaction(tx.getKey(), tx.getValue());
}
Map<String, List<String>> containerBlocks = createDeleteTXLog(delLog,
keyLocations, helper);
Set<String> containerNames = containerBlocks.keySet();
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
@ -268,4 +248,105 @@ public class TestStorageContainerManager {
}
}
}
}
@Test
public void testBlockDeletingThrottling() throws Exception {
int numKeys = 15;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
numKeys);
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.numDataNodes(1).setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
DeletedBlockLog delLog = cluster.getStorageContainerManager()
.getScmBlockManager().getDeletedBlockLog();
Assert.assertEquals(0, delLog.getNumOfValidTransactions());
int limitSize = 1;
// Reset limit value to 1, so that we only allow one TX is dealt per
// datanode.
SCMBlockDeletingService delService = cluster.getStorageContainerManager()
.getScmBlockManager().getSCMBlockDeletingService();
delService.setBlockDeleteTXNum(limitSize);
// Create {numKeys} random names keys.
TestStorageContainerManagerHelper helper =
new TestStorageContainerManagerHelper(cluster, conf);
Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
createDeleteTXLog(delLog, keyLocations, helper);
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
// Verify the size in delete commands is expected.
GenericTestUtils.waitFor(() -> {
NodeManager nodeManager = cluster.getStorageContainerManager()
.getScmNodeManager();
ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports).setCount(0).build();
List<SCMCommand> commands = nodeManager.sendHeartbeat(
nodeManager.getNodes(NodeState.HEALTHY).get(0), null, reportState);
if (commands != null) {
for (SCMCommand cmd : commands) {
if (cmd.getType() == Type.deleteBlocksCommand) {
List<DeletedBlocksTransaction> deletedTXs =
((DeleteBlocksCommand) cmd).blocksTobeDeleted();
return deletedTXs != null && deletedTXs.size() == limitSize;
}
}
}
return false;
}, 500, 10000);
}
private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog,
Map<String, KsmKeyInfo> keyLocations,
TestStorageContainerManagerHelper helper) throws IOException {
// These keys will be written into a bunch of containers,
// gets a set of container names, verify container containerBlocks
// on datanodes.
Set<String> containerNames = new HashSet<>();
for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
entry.getValue().getKeyLocationList()
.forEach(loc -> containerNames.add(loc.getContainerName()));
}
// Total number of containerBlocks of these containers should be equal to
// total number of containerBlocks via creation call.
int totalCreatedBlocks = 0;
for (KsmKeyInfo info : keyLocations.values()) {
totalCreatedBlocks += info.getKeyLocationList().size();
}
Assert.assertTrue(totalCreatedBlocks > 0);
Assert.assertEquals(totalCreatedBlocks,
helper.getAllBlocks(containerNames).size());
// Create a deletion TX for each key.
Map<String, List<String>> containerBlocks = Maps.newHashMap();
for (KsmKeyInfo info : keyLocations.values()) {
List<KsmKeyLocationInfo> list = info.getKeyLocationList();
list.forEach(location -> {
if (containerBlocks.containsKey(location.getContainerName())) {
containerBlocks.get(location.getContainerName())
.add(location.getBlockID());
} else {
List<String> blks = Lists.newArrayList();
blks.add(location.getBlockID());
containerBlocks.put(location.getContainerName(), blks);
}
});
}
for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
delLog.addTransaction(tx.getKey(), tx.getValue());
}
return containerBlocks;
}
}

View File

@ -19,9 +19,14 @@ package org.apache.hadoop.ozone.scm.block;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
@ -29,11 +34,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -42,6 +50,7 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.mockito.Mockito.mock;
/**
* Tests for DeletedBlockLog.
@ -237,4 +246,97 @@ public class TestDeletedBlockLog {
blocks = deletedBlockLog.getTransactions(10);
Assert.assertEquals(10, blocks.size());
}
@Test
public void testDeletedBlockTransactions() throws IOException {
int txNum = 10;
int maximumAllowedTXNum = 5;
List<DeletedBlocksTransaction> blocks = null;
List<String> containerNames = new LinkedList<>();
int count = 0;
String containerName = null;
DatanodeID dnID1 = new DatanodeID(null, null, "node1", 0, 0, 0, 0);
DatanodeID dnID2 = new DatanodeID(null, null, "node2", 0, 0, 0, 0);
Mapping mappingService = mock(ContainerMapping.class);
// Creates {TXNum} TX in the log.
for (Map.Entry<String, List<String>> entry : generateData(txNum)
.entrySet()) {
count++;
containerName = entry.getKey();
containerNames.add(containerName);
deletedBlockLog.addTransaction(containerName, entry.getValue());
// make TX[1-6] for datanode1; TX[7-10] for datanode2
if (count <= (maximumAllowedTXNum + 1)) {
mockContainerInfo(mappingService, containerName, dnID1);
} else {
mockContainerInfo(mappingService, containerName, dnID2);
}
}
DatanodeDeletedBlockTransactions transactions =
new DatanodeDeletedBlockTransactions(mappingService,
maximumAllowedTXNum, 2);
deletedBlockLog.getTransactions(transactions);
List<Long> txIDs = new LinkedList<>();
for (DatanodeID dnID : transactions.getDatanodes()) {
List<DeletedBlocksTransaction> txs = transactions
.getDatanodeTransactions(dnID);
for (DeletedBlocksTransaction tx : txs) {
txIDs.add(tx.getTxID());
}
}
// delete TX ID
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(txNum);
// There should be one block remained since dnID1 reaches
// the maximum value (5).
Assert.assertEquals(1, blocks.size());
Assert.assertFalse(transactions.isFull());
// The number of TX in dnID1 won't more than maximum value.
Assert.assertEquals(maximumAllowedTXNum,
transactions.getDatanodeTransactions(dnID1).size());
int size = transactions.getDatanodeTransactions(dnID2).size();
// add duplicated container in dnID2, this should be failed.
DeletedBlocksTransaction.Builder builder =
DeletedBlocksTransaction.newBuilder();
builder.setTxID(11);
builder.setContainerName(containerName);
builder.setCount(0);
transactions.addTransaction(builder.build());
// The number of TX in dnID2 should not be changed.
Assert.assertEquals(size,
transactions.getDatanodeTransactions(dnID2).size());
// Add new TX in dnID2, then dnID2 will reach maximum value.
containerName = "newContainer";
builder = DeletedBlocksTransaction.newBuilder();
builder.setTxID(12);
builder.setContainerName(containerName);
builder.setCount(0);
mockContainerInfo(mappingService, containerName, dnID2);
transactions.addTransaction(builder.build());
// Since all node are full, then transactions is full.
Assert.assertTrue(transactions.isFull());
}
private void mockContainerInfo(Mapping mappingService, String containerName,
DatanodeID dnID) throws IOException {
Pipeline pipeline = new Pipeline("fake");
pipeline.addMember(dnID);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(pipeline);
builder.setContainerName(containerName);
ContainerInfo conatinerInfo = builder.build();
Mockito.doReturn(conatinerInfo).when(mappingService)
.getContainer(containerName);
}
}