HDDS-273. DeleteLog entries should be purged only after corresponding DNs commit the transaction. Contributed by Lokesh Jain.

This commit is contained in:
Mukul Kumar Singh 2018-07-29 01:02:24 +05:30
parent 6b038f82da
commit feb795b58d
8 changed files with 256 additions and 206 deletions

View File

@ -113,8 +113,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
DeleteBlockTransactionResult.Builder txResultBuilder = DeleteBlockTransactionResult.Builder txResultBuilder =
DeleteBlockTransactionResult.newBuilder(); DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID()); txResultBuilder.setTxID(entry.getTxID());
long containerId = entry.getContainerID();
try { try {
long containerId = entry.getContainerID();
Container cont = containerSet.getContainer(containerId); Container cont = containerSet.getContainer(containerId);
if (cont == null) { if (cont == null) {
throw new StorageContainerException("Unable to find the container " throw new StorageContainerException("Unable to find the container "
@ -126,7 +126,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
KeyValueContainerData containerData = (KeyValueContainerData) KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData(); cont.getContainerData();
deleteKeyValueContainerBlocks(containerData, entry); deleteKeyValueContainerBlocks(containerData, entry);
txResultBuilder.setSuccess(true); txResultBuilder.setContainerID(containerId)
.setSuccess(true);
break; break;
default: default:
LOG.error( LOG.error(
@ -136,9 +137,12 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to delete blocks for container={}, TXID={}", LOG.warn("Failed to delete blocks for container={}, TXID={}",
entry.getContainerID(), entry.getTxID(), e); entry.getContainerID(), entry.getTxID(), e);
txResultBuilder.setSuccess(false); txResultBuilder.setContainerID(containerId)
.setSuccess(false);
} }
resultBuilder.addResults(txResultBuilder.build()); resultBuilder.addResults(txResultBuilder.build())
.setDnId(context.getParent().getDatanodeDetails()
.getUuid().toString());
}); });
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();

View File

@ -229,9 +229,11 @@ message DeletedBlocksTransaction {
message ContainerBlocksDeletionACKProto { message ContainerBlocksDeletionACKProto {
message DeleteBlockTransactionResult { message DeleteBlockTransactionResult {
required int64 txID = 1; required int64 txID = 1;
required bool success = 2; required int64 containerID = 2;
required bool success = 3;
} }
repeated DeleteBlockTransactionResult results = 1; repeated DeleteBlockTransactionResult results = 1;
required string dnId = 2;
} }
// SendACK response returned by datanode to SCM, currently empty. // SendACK response returned by datanode to SCM, currently empty.

View File

@ -112,7 +112,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service. // SCM block deleting transaction log and deleting service.
deletedBlockLog = new DeletedBlockLogImpl(conf); deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
long svcInterval = long svcInterval =
conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,

View File

@ -53,7 +53,8 @@ public class DatanodeDeletedBlockTransactions {
this.nodeNum = nodeNum; this.nodeNum = nodeNum;
} }
public void addTransaction(DeletedBlocksTransaction tx) throws IOException { public void addTransaction(DeletedBlocksTransaction tx,
Set<UUID> dnsWithTransactionCommitted) throws IOException {
Pipeline pipeline = null; Pipeline pipeline = null;
try { try {
pipeline = mappingService.getContainerWithPipeline(tx.getContainerID()) pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
@ -71,29 +72,37 @@ public class DatanodeDeletedBlockTransactions {
for (DatanodeDetails dd : pipeline.getMachines()) { for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid(); UUID dnID = dd.getUuid();
if (transactions.containsKey(dnID)) { if (dnsWithTransactionCommitted == null ||
List<DeletedBlocksTransaction> txs = transactions.get(dnID); !dnsWithTransactionCommitted.contains(dnID)) {
if (txs != null && txs.size() < maximumAllowedTXNum) { // Transaction need not be sent to dns which have already committed it
boolean hasContained = false; addTransactionToDN(dnID, tx);
for (DeletedBlocksTransaction t : txs) { }
if (t.getContainerID() == tx.getContainerID()) { }
hasContained = true; }
break;
}
}
if (!hasContained) { private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
txs.add(tx); if (transactions.containsKey(dnID)) {
currentTXNum++; List<DeletedBlocksTransaction> txs = transactions.get(dnID);
if (txs != null && txs.size() < maximumAllowedTXNum) {
boolean hasContained = false;
for (DeletedBlocksTransaction t : txs) {
if (t.getContainerID() == tx.getContainerID()) {
hasContained = true;
break;
} }
} }
} else {
currentTXNum++; if (!hasContained) {
transactions.put(dnID, tx); txs.add(tx);
currentTXNum++;
}
} }
SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID, } else {
tx.getTxID()); currentTXNum++;
transactions.put(dnID, tx);
} }
SCMBlockDeletingService.LOG
.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
} }
Set<UUID> getDatanodeIDs() { Set<UUID> getDatanodeIDs() {

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hdds.scm.block; package org.apache.hadoop.hdds.scm.block;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@ -24,6 +27,7 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
/** /**
* The DeletedBlockLog is a persisted log in SCM to keep tracking * The DeletedBlockLog is a persisted log in SCM to keep tracking
@ -33,18 +37,6 @@ import java.util.Map;
*/ */
public interface DeletedBlockLog extends Closeable { public interface DeletedBlockLog extends Closeable {
/**
* A limit size list of transactions. Note count is the max number
* of TXs to return, we might not be able to always return this
* number. and the processCount of those transactions
* should be [0, MAX_RETRY).
*
* @param count - number of transactions.
* @return a list of BlockDeletionTransaction.
*/
List<DeletedBlocksTransaction> getTransactions(int count)
throws IOException;
/** /**
* Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
* Once DatanodeDeletedBlockTransactions is full, the scan behavior will * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
@ -81,10 +73,11 @@ public interface DeletedBlockLog extends Closeable {
* Commits a transaction means to delete all footprints of a transaction * Commits a transaction means to delete all footprints of a transaction
* from the log. This method doesn't guarantee all transactions can be * from the log. This method doesn't guarantee all transactions can be
* successfully deleted, it tolerate failures and tries best efforts to. * successfully deleted, it tolerate failures and tries best efforts to.
* * @param transactionResults - delete block transaction results.
* @param txIDs - transaction IDs. * @param dnID - ID of datanode which acknowledges the delete block command.
*/ */
void commitTransactions(List<Long> txIDs) throws IOException; void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
UUID dnID);
/** /**
* Creates a block deletion transaction and adds that into the log. * Creates a block deletion transaction and adds that into the log.

View File

@ -21,27 +21,36 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder; import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
@ -74,12 +83,15 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
private final int maxRetry; private final int maxRetry;
private final MetadataStore deletedStore; private final MetadataStore deletedStore;
private final Mapping containerManager;
private final Lock lock; private final Lock lock;
// The latest id of deleted blocks in the db. // The latest id of deleted blocks in the db.
private long lastTxID; private long lastTxID;
private long lastReadTxID; // Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
public DeletedBlockLogImpl(Configuration conf) throws IOException { public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
@ -95,11 +107,17 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
.setDbFile(deletedLogDbPath) .setDbFile(deletedLogDbPath)
.setCacheSize(cacheSize * OzoneConsts.MB) .setCacheSize(cacheSize * OzoneConsts.MB)
.build(); .build();
this.containerManager = containerManager;
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
// start from the head of deleted store. // start from the head of deleted store.
lastReadTxID = 0;
lastTxID = findLatestTxIDInStore(); lastTxID = findLatestTxIDInStore();
// transactionToDNsCommitMap is updated only when
// transaction is added to the log and when it is removed.
// maps transaction to dns which have committed it.
transactionToDNsCommitMap = new ConcurrentHashMap<>();
} }
@VisibleForTesting @VisibleForTesting
@ -123,39 +141,6 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
return txid; return txid;
} }
@Override
public List<DeletedBlocksTransaction> getTransactions(
int count) throws IOException {
List<DeletedBlocksTransaction> result = new ArrayList<>();
MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
-> Longs.fromByteArray(currentKey) > lastReadTxID;
MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
-> !Arrays.equals(LATEST_TXID, currentKey);
lock.lock();
try {
deletedStore.iterate(null, (key, value) -> {
if (getNextTxID.filterKey(null, key, null) &&
avoidInvalidTxid.filterKey(null, key, null)) {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(value);
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
result.add(block);
}
}
return result.size() < count;
});
// Scan the metadata from the beginning.
if (result.size() < count || result.size() < 1) {
lastReadTxID = 0;
} else {
lastReadTxID = result.get(result.size() - 1).getTxID();
}
} finally {
lock.unlock();
}
return result;
}
@Override @Override
public List<DeletedBlocksTransaction> getFailedTransactions() public List<DeletedBlocksTransaction> getFailedTransactions()
throws IOException { throws IOException {
@ -235,18 +220,50 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
* @param txIDs - transaction IDs. * @param transactionResults - transaction IDs.
* @param dnID - Id of Datanode which has acknowledged a delete block command.
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void commitTransactions(List<Long> txIDs) throws IOException { public void commitTransactions(
List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
lock.lock(); lock.lock();
try { try {
for (Long txID : txIDs) { Set<UUID> dnsWithCommittedTxn;
for (DeleteBlockTransactionResult transactionResult : transactionResults) {
if (isTransactionFailed(transactionResult)) {
continue;
}
try { try {
deletedStore.delete(Longs.toByteArray(txID)); long txID = transactionResult.getTxID();
} catch (IOException ex) { // set of dns which have successfully committed transaction txId.
LOG.warn("Cannot commit txID " + txID, ex); dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
Long containerId = transactionResult.getContainerID();
if (dnsWithCommittedTxn == null || containerId == null) {
LOG.warn("Transaction txId={} commit by dnId={} failed."
+ " Corresponding entry not found.", txID, dnID);
return;
}
dnsWithCommittedTxn.add(dnID);
Collection<DatanodeDetails> containerDnsDetails =
containerManager.getContainerWithPipeline(containerId)
.getPipeline().getDatanodes().values();
// The delete entry can be safely removed from the log if all the
// corresponding nodes commit the txn.
if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
List<UUID> containerDns = containerDnsDetails.stream()
.map(dnDetails -> dnDetails.getUuid())
.collect(Collectors.toList());
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
LOG.debug("Purging txId={} from block deletion log", txID);
deletedStore.delete(Longs.toByteArray(txID));
}
}
} catch (IOException e) {
LOG.warn("Could not commit delete block transaction: " +
transactionResult.getTxID(), e);
} }
} }
} finally { } finally {
@ -254,6 +271,20 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
} }
} }
private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
result.getTxID(), result.getSuccess());
}
if (!result.getSuccess()) {
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ "TX in next interval", result.getTxID());
return true;
}
return false;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
@ -355,7 +386,9 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
.parseFrom(value); .parseFrom(value);
if (block.getCount() > -1 && block.getCount() <= maxRetry) { if (block.getCount() > -1 && block.getCount() <= maxRetry) {
transactions.addTransaction(block); Set<UUID> dnsWithTransactionCommitted = transactionToDNsCommitMap
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
transactions.addTransaction(block, dnsWithTransactionCommitted);
} }
return !transactions.isFull(); return !transactions.isFull();
} }

View File

@ -91,9 +91,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@ -230,21 +230,8 @@ public class SCMDatanodeProtocolServer implements
ContainerBlocksDeletionACKProto acks) throws IOException { ContainerBlocksDeletionACKProto acks) throws IOException {
if (acks.getResultsCount() > 0) { if (acks.getResultsCount() > 0) {
List<DeleteBlockTransactionResult> resultList = acks.getResultsList(); List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
for (DeleteBlockTransactionResult result : resultList) { scm.getScmBlockManager().getDeletedBlockLog()
if (LOG.isDebugEnabled()) { .commitTransactions(resultList, UUID.fromString(acks.getDnId()));
LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+ "success={}", result.getTxID(), result.getSuccess());
}
if (result.getSuccess()) {
LOG.debug("Purging TXID={} from block deletion log",
result.getTxID());
scm.getScmBlockManager().getDeletedBlockLog()
.commitTransactions(Collections.singletonList(result.getTxID()));
} else {
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ "TX in next interval", result.getTxID());
}
}
} }
return ContainerBlocksDeletionACKResponseProto.newBuilder() return ContainerBlocksDeletionACKResponseProto.newBuilder()
.getDefaultInstanceForType(); .getDefaultInstanceForType();

View File

@ -32,6 +32,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStore;
@ -45,6 +48,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -56,7 +60,8 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static org.mockito.Mockito.mock; import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;
/** /**
* Tests for DeletedBlockLog. * Tests for DeletedBlockLog.
@ -66,6 +71,8 @@ public class TestDeletedBlockLog {
private static DeletedBlockLogImpl deletedBlockLog; private static DeletedBlockLogImpl deletedBlockLog;
private OzoneConfiguration conf; private OzoneConfiguration conf;
private File testDir; private File testDir;
private Mapping containerManager;
private List<DatanodeDetails> dnList;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
@ -74,7 +81,36 @@ public class TestDeletedBlockLog {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
deletedBlockLog = new DeletedBlockLogImpl(conf); containerManager = Mockito.mock(ContainerMapping.class);
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
dnList = new ArrayList<>(3);
setupContainerManager();
}
private void setupContainerManager() throws IOException {
dnList.add(
DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
.build());
dnList.add(
DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
.build());
dnList.add(
DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
.build());
ContainerInfo containerInfo =
new ContainerInfo.Builder().setContainerID(1).build();
Pipeline pipeline =
new Pipeline(null, LifeCycleState.CLOSED, ReplicationType.RATIS,
ReplicationFactor.THREE, null);
pipeline.addMember(dnList.get(0));
pipeline.addMember(dnList.get(1));
pipeline.addMember(dnList.get(2));
ContainerWithPipeline containerWithPipeline =
new ContainerWithPipeline(containerInfo, pipeline);
when(containerManager.getContainerWithPipeline(anyLong()))
.thenReturn(containerWithPipeline);
when(containerManager.getContainer(anyLong())).thenReturn(containerInfo);
} }
@After @After
@ -101,45 +137,50 @@ public class TestDeletedBlockLog {
return blockMap; return blockMap;
} }
@Test private void commitTransactions(
public void testGetTransactions() throws Exception { List<DeleteBlockTransactionResult> transactionResults,
List<DeletedBlocksTransaction> blocks = DatanodeDetails... dns) {
deletedBlockLog.getTransactions(30); for (DatanodeDetails dnDetails : dns) {
Assert.assertEquals(0, blocks.size()); deletedBlockLog
.commitTransactions(transactionResults, dnDetails.getUuid());
}
}
// Creates 40 TX in the log. private void commitTransactions(
for (Map.Entry<Long, List<Long>> entry : generateData(40).entrySet()){ List<DeleteBlockTransactionResult> transactionResults) {
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); commitTransactions(transactionResults,
} dnList.toArray(new DatanodeDetails[3]));
}
// Get first 30 TXs. private void commitTransactions(
blocks = deletedBlockLog.getTransactions(30); Collection<DeletedBlocksTransaction> deletedBlocksTransactions,
Assert.assertEquals(30, blocks.size()); DatanodeDetails... dns) {
for (int i = 0; i < 30; i++) { commitTransactions(deletedBlocksTransactions.stream()
Assert.assertEquals(i + 1, blocks.get(i).getTxID()); .map(this::createDeleteBlockTransactionResult)
} .collect(Collectors.toList()), dns);
}
// Get another 30 TXs. private void commitTransactions(
// The log only 10 left, so this time it will only return 10 TXs. Collection<DeletedBlocksTransaction> deletedBlocksTransactions) {
blocks = deletedBlockLog.getTransactions(30); commitTransactions(deletedBlocksTransactions.stream()
Assert.assertEquals(10, blocks.size()); .map(this::createDeleteBlockTransactionResult)
for (int i = 30; i < 40; i++) { .collect(Collectors.toList()));
Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID()); }
}
// Get another 50 TXs. private DeleteBlockTransactionResult createDeleteBlockTransactionResult(
// By now the position should have moved to the beginning, DeletedBlocksTransaction transaction) {
// this call will return all 40 TXs. return DeleteBlockTransactionResult.newBuilder()
blocks = deletedBlockLog.getTransactions(50); .setContainerID(transaction.getContainerID()).setSuccess(true)
Assert.assertEquals(40, blocks.size()); .setTxID(transaction.getTxID()).build();
for (int i = 0; i < 40; i++) { }
Assert.assertEquals(i + 1, blocks.get(i).getTxID());
} private List<DeletedBlocksTransaction> getTransactions(
List<Long> txIDs = new ArrayList<>(); int maximumAllowedTXNum) throws IOException {
for (DeletedBlocksTransaction block : blocks) { DatanodeDeletedBlockTransactions transactions =
txIDs.add(block.getTxID()); new DatanodeDeletedBlockTransactions(containerManager,
} maximumAllowedTXNum, 3);
deletedBlockLog.commitTransactions(txIDs); deletedBlockLog.getTransactions(transactions);
return transactions.getDatanodeTransactions(dnList.get(0).getUuid());
} }
@Test @Test
@ -153,7 +194,7 @@ public class TestDeletedBlockLog {
// This will return all TXs, total num 30. // This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks = List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(40); getTransactions(40);
List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID) List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -164,13 +205,13 @@ public class TestDeletedBlockLog {
// Increment another time so it exceed the maxRetry. // Increment another time so it exceed the maxRetry.
// On this call, count will be set to -1 which means TX eventually fails. // On this call, count will be set to -1 which means TX eventually fails.
deletedBlockLog.incrementCount(txIDs); deletedBlockLog.incrementCount(txIDs);
blocks = deletedBlockLog.getTransactions(40); blocks = getTransactions(40);
for (DeletedBlocksTransaction block : blocks) { for (DeletedBlocksTransaction block : blocks) {
Assert.assertEquals(-1, block.getCount()); Assert.assertEquals(-1, block.getCount());
} }
// If all TXs are failed, getTransactions call will always return nothing. // If all TXs are failed, getTransactions call will always return nothing.
blocks = deletedBlockLog.getTransactions(40); blocks = getTransactions(40);
Assert.assertEquals(blocks.size(), 0); Assert.assertEquals(blocks.size(), 0);
} }
@ -180,16 +221,26 @@ public class TestDeletedBlockLog {
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
} }
List<DeletedBlocksTransaction> blocks = List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(20); getTransactions(20);
List<Long> txIDs = new ArrayList<>(); // Add an invalid txn.
for (DeletedBlocksTransaction block : blocks) { blocks.add(
txIDs.add(block.getTxID()); DeletedBlocksTransaction.newBuilder().setContainerID(1).setTxID(70)
} .setCount(0).addLocalID(0).build());
// Add an invalid txID. commitTransactions(blocks);
txIDs.add(70L); blocks.remove(blocks.size() - 1);
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(50); blocks = getTransactions(50);
Assert.assertEquals(30, blocks.size()); Assert.assertEquals(30, blocks.size());
commitTransactions(blocks, dnList.get(1), dnList.get(2),
DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
.build());
blocks = getTransactions(50);
Assert.assertEquals(30, blocks.size());
commitTransactions(blocks, dnList.get(0));
blocks = getTransactions(50);
Assert.assertEquals(0, blocks.size());
} }
@Test @Test
@ -213,20 +264,16 @@ public class TestDeletedBlockLog {
} }
added += 10; added += 10;
} else if (state == 1) { } else if (state == 1) {
blocks = deletedBlockLog.getTransactions(20); blocks = getTransactions(20);
txIDs = new ArrayList<>(); txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) { for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID()); txIDs.add(block.getTxID());
} }
deletedBlockLog.incrementCount(txIDs); deletedBlockLog.incrementCount(txIDs);
} else if (state == 2) { } else if (state == 2) {
txIDs = new ArrayList<>(); commitTransactions(blocks);
for (DeletedBlocksTransaction block : blocks) { committed += blocks.size();
txIDs.add(block.getTxID());
}
blocks = new ArrayList<>(); blocks = new ArrayList<>();
committed += txIDs.size();
deletedBlockLog.commitTransactions(txIDs);
} else { } else {
// verify the number of added and committed. // verify the number of added and committed.
List<Map.Entry<byte[], byte[]>> result = List<Map.Entry<byte[], byte[]>> result =
@ -234,6 +281,8 @@ public class TestDeletedBlockLog {
Assert.assertEquals(added, result.size() + committed); Assert.assertEquals(added, result.size() + committed);
} }
} }
blocks = getTransactions(1000);
commitTransactions(blocks);
} }
@Test @Test
@ -244,16 +293,13 @@ public class TestDeletedBlockLog {
// close db and reopen it again to make sure // close db and reopen it again to make sure
// transactions are stored persistently. // transactions are stored persistently.
deletedBlockLog.close(); deletedBlockLog.close();
deletedBlockLog = new DeletedBlockLogImpl(conf); deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
List<DeletedBlocksTransaction> blocks = List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(10); getTransactions(10);
List<Long> txIDs = new ArrayList<>(); commitTransactions(blocks);
for (DeletedBlocksTransaction block : blocks) { blocks = getTransactions(100);
txIDs.add(block.getTxID()); Assert.assertEquals(40, blocks.size());
} commitTransactions(blocks);
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(10);
Assert.assertEquals(10, blocks.size());
} }
@Test @Test
@ -262,32 +308,11 @@ public class TestDeletedBlockLog {
int maximumAllowedTXNum = 5; int maximumAllowedTXNum = 5;
List<DeletedBlocksTransaction> blocks = null; List<DeletedBlocksTransaction> blocks = null;
List<Long> containerIDs = new LinkedList<>(); List<Long> containerIDs = new LinkedList<>();
DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
int count = 0; int count = 0;
long containerID = 0L; long containerID = 0L;
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
DatanodeDetails dnId1 = DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setIpAddress("127.0.0.1")
.setHostName("localhost")
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort)
.build();
DatanodeDetails dnId2 = DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setIpAddress("127.0.0.1")
.setHostName("localhost")
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort)
.build();
Mapping mappingService = mock(ContainerMapping.class);
// Creates {TXNum} TX in the log. // Creates {TXNum} TX in the log.
for (Map.Entry<Long, List<Long>> entry : generateData(txNum) for (Map.Entry<Long, List<Long>> entry : generateData(txNum)
.entrySet()) { .entrySet()) {
@ -298,29 +323,25 @@ public class TestDeletedBlockLog {
// make TX[1-6] for datanode1; TX[7-10] for datanode2 // make TX[1-6] for datanode1; TX[7-10] for datanode2
if (count <= (maximumAllowedTXNum + 1)) { if (count <= (maximumAllowedTXNum + 1)) {
mockContainerInfo(mappingService, containerID, dnId1); mockContainerInfo(containerID, dnId1);
} else { } else {
mockContainerInfo(mappingService, containerID, dnId2); mockContainerInfo(containerID, dnId2);
} }
} }
DatanodeDeletedBlockTransactions transactions = DatanodeDeletedBlockTransactions transactions =
new DatanodeDeletedBlockTransactions(mappingService, new DatanodeDeletedBlockTransactions(containerManager,
maximumAllowedTXNum, 2); maximumAllowedTXNum, 2);
deletedBlockLog.getTransactions(transactions); deletedBlockLog.getTransactions(transactions);
List<Long> txIDs = new LinkedList<>();
for (UUID id : transactions.getDatanodeIDs()) { for (UUID id : transactions.getDatanodeIDs()) {
List<DeletedBlocksTransaction> txs = transactions List<DeletedBlocksTransaction> txs = transactions
.getDatanodeTransactions(id); .getDatanodeTransactions(id);
for (DeletedBlocksTransaction tx : txs) { // delete TX ID
txIDs.add(tx.getTxID()); commitTransactions(txs);
}
} }
// delete TX ID blocks = getTransactions(txNum);
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(txNum);
// There should be one block remained since dnID1 reaches // There should be one block remained since dnID1 reaches
// the maximum value (5). // the maximum value (5).
Assert.assertEquals(1, blocks.size()); Assert.assertEquals(1, blocks.size());
@ -337,7 +358,8 @@ public class TestDeletedBlockLog {
builder.setTxID(11); builder.setTxID(11);
builder.setContainerID(containerID); builder.setContainerID(containerID);
builder.setCount(0); builder.setCount(0);
transactions.addTransaction(builder.build()); transactions.addTransaction(builder.build(),
null);
// The number of TX in dnID2 should not be changed. // The number of TX in dnID2 should not be changed.
Assert.assertEquals(size, Assert.assertEquals(size,
@ -349,14 +371,14 @@ public class TestDeletedBlockLog {
builder.setTxID(12); builder.setTxID(12);
builder.setContainerID(containerID); builder.setContainerID(containerID);
builder.setCount(0); builder.setCount(0);
mockContainerInfo(mappingService, containerID, dnId2); mockContainerInfo(containerID, dnId2);
transactions.addTransaction(builder.build()); transactions.addTransaction(builder.build(),
null);
// Since all node are full, then transactions is full. // Since all node are full, then transactions is full.
Assert.assertTrue(transactions.isFull()); Assert.assertTrue(transactions.isFull());
} }
private void mockContainerInfo(Mapping mappingService, long containerID, private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
DatanodeDetails dd) throws IOException {
Pipeline pipeline = Pipeline pipeline =
new Pipeline("fake", LifeCycleState.OPEN, new Pipeline("fake", LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake"); ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
@ -370,9 +392,9 @@ public class TestDeletedBlockLog {
ContainerInfo containerInfo = builder.build(); ContainerInfo containerInfo = builder.build();
ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline( ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
containerInfo, pipeline); containerInfo, pipeline);
Mockito.doReturn(containerInfo).when(mappingService) Mockito.doReturn(containerInfo).when(containerManager)
.getContainer(containerID); .getContainer(containerID);
Mockito.doReturn(containerWithPipeline).when(mappingService) Mockito.doReturn(containerWithPipeline).when(containerManager)
.getContainerWithPipeline(containerID); .getContainerWithPipeline(containerID);
} }
} }