HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu.
This commit is contained in:
parent
9c789e883d
commit
91ffbaa8b9
|
@ -80,6 +80,7 @@ public final class OzoneConsts {
|
||||||
public static final String BLOCK_DB = "block.db";
|
public static final String BLOCK_DB = "block.db";
|
||||||
public static final String NODEPOOL_DB = "nodepool.db";
|
public static final String NODEPOOL_DB = "nodepool.db";
|
||||||
public static final String OPEN_CONTAINERS_DB = "openContainers.db";
|
public static final String OPEN_CONTAINERS_DB = "openContainers.db";
|
||||||
|
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
|
||||||
public static final String KSM_DB_NAME = "ksm.db";
|
public static final String KSM_DB_NAME = "ksm.db";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -212,6 +212,9 @@ public final class ScmConfigKeys {
|
||||||
public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
|
public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
|
||||||
300; // Default 5 minute wait.
|
300; // Default 5 minute wait.
|
||||||
|
|
||||||
|
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
|
||||||
|
"ozone.scm.block.deletion.max.retry";
|
||||||
|
public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Never constructed.
|
* Never constructed.
|
||||||
|
|
|
@ -88,6 +88,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
||||||
|
|
||||||
// Track all containers owned by block service.
|
// Track all containers owned by block service.
|
||||||
private final MetadataStore containerStore;
|
private final MetadataStore containerStore;
|
||||||
|
private final DeletedBlockLog deletedBlockLog;
|
||||||
|
|
||||||
private Map<OzoneProtos.LifeCycleState,
|
private Map<OzoneProtos.LifeCycleState,
|
||||||
Map<String, BlockContainerInfo>> containers;
|
Map<String, BlockContainerInfo>> containers;
|
||||||
|
@ -142,6 +143,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
||||||
this.lock = new ReentrantLock();
|
this.lock = new ReentrantLock();
|
||||||
|
|
||||||
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
|
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
|
||||||
|
deletedBlockLog = new DeletedBlockLogImpl(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: close full (or almost full) containers with a separate thread.
|
// TODO: close full (or almost full) containers with a separate thread.
|
||||||
|
@ -490,7 +492,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
||||||
if (containerStore != null) {
|
if (containerStore != null) {
|
||||||
containerStore.close();
|
containerStore.close();
|
||||||
}
|
}
|
||||||
|
if (deletedBlockLog != null) {
|
||||||
|
deletedBlockLog.close();
|
||||||
|
}
|
||||||
MBeans.unregister(mxBean);
|
MBeans.unregister(mxBean);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The DeletedBlockLog is a persisted log in SCM to keep tracking
|
||||||
|
* container blocks which are under deletion. It maintains info
|
||||||
|
* about under-deletion container blocks that notified by KSM,
|
||||||
|
* and the state how it is processed.
|
||||||
|
*/
|
||||||
|
public interface DeletedBlockLog extends Closeable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A limit size list of transactions. Note count is the max number
|
||||||
|
* of TXs to return, we might not be able to always return this
|
||||||
|
* number. and the processCount of those transactions
|
||||||
|
* should be [0, MAX_RETRY).
|
||||||
|
*
|
||||||
|
* @param count - number of transactions.
|
||||||
|
* @return a list of BlockDeletionTransaction.
|
||||||
|
*/
|
||||||
|
List<DeletedBlocksTransaction> getTransactions(int count)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments count for given list of transactions by 1.
|
||||||
|
* The log maintains a valid range of counts for each transaction
|
||||||
|
* [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
|
||||||
|
* the transaction is no longer valid.
|
||||||
|
*
|
||||||
|
* @param txIDs - transaction ID.
|
||||||
|
*/
|
||||||
|
void incrementCount(List<Long> txIDs)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commits a transaction means to delete all footprints of a transaction
|
||||||
|
* from the log. This method doesn't guarantee all transactions can be
|
||||||
|
* successfully deleted, it tolerate failures and tries best efforts to.
|
||||||
|
*
|
||||||
|
* @param txIDs - transaction IDs.
|
||||||
|
*/
|
||||||
|
void commitTransactions(List<Long> txIDs) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a block deletion transaction and adds that into the log.
|
||||||
|
*
|
||||||
|
* @param containerName - container name.
|
||||||
|
* @param blocks - blocks that belong to the same container.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void addTransaction(String containerName, List<String> blocks)
|
||||||
|
throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,246 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
|
import org.apache.hadoop.utils.BatchOperation;
|
||||||
|
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
|
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A implement class of {@link DeletedBlockLog}, and it uses
|
||||||
|
* K/V db to maintain block deletion transactions between scm and datanode.
|
||||||
|
* This is a very basic implementation, it simply scans the log and
|
||||||
|
* memorize the position that scanned by last time, and uses this to
|
||||||
|
* determine where the next scan starts. It has no notion about weight
|
||||||
|
* of each transaction so as long as transaction is still valid, they get
|
||||||
|
* equally same chance to be retrieved which only depends on the nature
|
||||||
|
* order of the transaction ID.
|
||||||
|
*/
|
||||||
|
public class DeletedBlockLogImpl implements DeletedBlockLog {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
|
||||||
|
|
||||||
|
private static final byte[] LATEST_TXID =
|
||||||
|
DFSUtil.string2Bytes("#LATEST_TXID#");
|
||||||
|
|
||||||
|
private final int maxRetry;
|
||||||
|
private final MetadataStore deletedStore;
|
||||||
|
private final Lock lock;
|
||||||
|
// The latest id of deleted blocks in the db.
|
||||||
|
private long lastTxID;
|
||||||
|
private long lastReadTxID;
|
||||||
|
|
||||||
|
public DeletedBlockLogImpl(Configuration conf) throws IOException {
|
||||||
|
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
|
||||||
|
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
|
||||||
|
|
||||||
|
File metaDir = OzoneUtils.getScmMetadirPath(conf);
|
||||||
|
String scmMetaDataDir = metaDir.getPath();
|
||||||
|
File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
|
||||||
|
int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
|
||||||
|
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||||
|
// Load store of all transactions.
|
||||||
|
deletedStore = MetadataStoreBuilder.newBuilder()
|
||||||
|
.setCreateIfMissing(true)
|
||||||
|
.setConf(conf)
|
||||||
|
.setDbFile(deletedLogDbPath)
|
||||||
|
.setCacheSize(cacheSize * OzoneConsts.MB)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
this.lock = new ReentrantLock();
|
||||||
|
// start from the head of deleted store.
|
||||||
|
lastReadTxID = 0;
|
||||||
|
lastTxID = findLatestTxIDInStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
MetadataStore getDeletedStore() {
|
||||||
|
return deletedStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* There is no need to lock before reading because
|
||||||
|
* it's only used in construct method.
|
||||||
|
*
|
||||||
|
* @return latest txid.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private long findLatestTxIDInStore() throws IOException {
|
||||||
|
long txid = 0;
|
||||||
|
byte[] value = deletedStore.get(LATEST_TXID);
|
||||||
|
if (value != null) {
|
||||||
|
txid = Longs.fromByteArray(value);
|
||||||
|
}
|
||||||
|
return txid;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<DeletedBlocksTransaction> getTransactions(
|
||||||
|
int count) throws IOException {
|
||||||
|
List<DeletedBlocksTransaction> result = new ArrayList<>();
|
||||||
|
MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
|
||||||
|
-> Longs.fromByteArray(currentKey) > lastReadTxID;
|
||||||
|
MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
|
||||||
|
-> !Arrays.equals(LATEST_TXID, currentKey);
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
deletedStore.iterate(null, (key, value) -> {
|
||||||
|
if (getNextTxID.filterKey(null, key, null) &&
|
||||||
|
avoidInvalidTxid.filterKey(null, key, null)) {
|
||||||
|
DeletedBlocksTransaction block = DeletedBlocksTransaction
|
||||||
|
.parseFrom(value);
|
||||||
|
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
|
||||||
|
result.add(block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result.size() < count;
|
||||||
|
});
|
||||||
|
// Scan the metadata from the beginning.
|
||||||
|
if (result.size() < count || result.size() < 1) {
|
||||||
|
lastReadTxID = 0;
|
||||||
|
} else {
|
||||||
|
lastReadTxID = result.get(result.size() - 1).getTxID();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @param txIDs - transaction ID.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void incrementCount(List<Long> txIDs) throws IOException {
|
||||||
|
BatchOperation batch = new BatchOperation();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
for(Long txID : txIDs) {
|
||||||
|
try {
|
||||||
|
DeletedBlocksTransaction block = DeletedBlocksTransaction
|
||||||
|
.parseFrom(deletedStore.get(Longs.toByteArray(txID)));
|
||||||
|
DeletedBlocksTransaction.Builder builder = block.toBuilder();
|
||||||
|
if (block.getCount() > -1) {
|
||||||
|
builder.setCount(block.getCount() + 1);
|
||||||
|
}
|
||||||
|
// if the retry time exceeds the maxRetry value
|
||||||
|
// then set the retry value to -1, stop retrying, admins can
|
||||||
|
// analyze those blocks and purge them manually by SCMCli.
|
||||||
|
if (block.getCount() > maxRetry) {
|
||||||
|
builder.setCount(-1);
|
||||||
|
}
|
||||||
|
deletedStore.put(Longs.toByteArray(txID),
|
||||||
|
builder.build().toByteArray());
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.warn("Cannot increase count for txID " + txID, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
deletedStore.writeBatch(batch);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @param txIDs - transaction IDs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void commitTransactions(List<Long> txIDs) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
for (Long txID : txIDs) {
|
||||||
|
try {
|
||||||
|
deletedStore.delete(Longs.toByteArray(txID));
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.warn("Cannot commit txID " + txID, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @param containerName - container name.
|
||||||
|
* @param blocks - blocks that belong to the same container.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void addTransaction(String containerName, List<String> blocks)
|
||||||
|
throws IOException {
|
||||||
|
BatchOperation batch = new BatchOperation();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
|
||||||
|
.setTxID(lastTxID + 1)
|
||||||
|
.setContainerName(containerName)
|
||||||
|
.addAllBlockID(blocks)
|
||||||
|
.setCount(0)
|
||||||
|
.build();
|
||||||
|
byte[] key = Longs.toByteArray(lastTxID + 1);
|
||||||
|
|
||||||
|
batch.put(key, tx.toByteArray());
|
||||||
|
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
|
||||||
|
|
||||||
|
deletedStore.writeBatch(batch);
|
||||||
|
lastTxID += 1;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (deletedStore != null) {
|
||||||
|
deletedStore.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -99,6 +99,15 @@ message ContainerInfo {
|
||||||
optional int64 keycount = 4;
|
optional int64 keycount = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The deleted blocks which are stored in deletedBlock.db of scm.
|
||||||
|
message DeletedBlocksTransaction {
|
||||||
|
required int64 txID = 1;
|
||||||
|
required string containerName = 2;
|
||||||
|
repeated string blockID = 3;
|
||||||
|
// the retry time of sending deleting command to datanode.
|
||||||
|
required int32 count = 4;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
A set of container reports, max count is generally set to
|
A set of container reports, max count is generally set to
|
||||||
8192 since that keeps the size of the reports under 1 MB.
|
8192 since that keeps the size of the reports under 1 MB.
|
||||||
|
|
|
@ -264,6 +264,19 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>ozone.scm.block.deletion.max.retry</name>
|
||||||
|
<value>4096</value>
|
||||||
|
<description>
|
||||||
|
SCM wraps up a number of blocks in a deletion transaction and send that
|
||||||
|
to datanode for physically deletion periodically. This property
|
||||||
|
determines how many times at most for SCM to retry sending a deletion
|
||||||
|
transaction to datanode. The default value 4096 is relatively big so
|
||||||
|
that SCM could try enough times before giving up, as the actual deletion
|
||||||
|
is async so time required is unpredictable.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>ozone.scm.heartbeat.log.warn.interval.count</name>
|
<name>ozone.scm.heartbeat.log.warn.interval.count</name>
|
||||||
<value>10</value>
|
<value>10</value>
|
||||||
|
|
|
@ -0,0 +1,240 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for DeletedBlockLog.
|
||||||
|
*/
|
||||||
|
public class TestDeletedBlockLog {
|
||||||
|
|
||||||
|
private static DeletedBlockLogImpl deletedBlockLog;
|
||||||
|
private OzoneConfiguration conf;
|
||||||
|
private File testDir;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
testDir = GenericTestUtils.getTestDir(
|
||||||
|
TestDeletedBlockLog.class.getSimpleName());
|
||||||
|
conf = new OzoneConfiguration();
|
||||||
|
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
|
||||||
|
conf.set(OZONE_CONTAINER_METADATA_DIRS, testDir.getAbsolutePath());
|
||||||
|
deletedBlockLog = new DeletedBlockLogImpl(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
deletedBlockLog.close();
|
||||||
|
FileUtils.deleteDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, List<String>> generateData(int dataSize) {
|
||||||
|
Map<String, List<String>> blockMap = new HashMap<>();
|
||||||
|
Random random = new Random(1);
|
||||||
|
for (int i = 0; i < dataSize; i++) {
|
||||||
|
String containerName = "container-" + UUID.randomUUID().toString();
|
||||||
|
List<String> blocks = new ArrayList<>();
|
||||||
|
int blockSize = random.nextInt(30) + 1;
|
||||||
|
for (int j = 0; j < blockSize; j++) {
|
||||||
|
blocks.add("block-" + UUID.randomUUID().toString());
|
||||||
|
}
|
||||||
|
blockMap.put(containerName, blocks);
|
||||||
|
}
|
||||||
|
return blockMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTransactions() throws Exception {
|
||||||
|
List<DeletedBlocksTransaction> blocks =
|
||||||
|
deletedBlockLog.getTransactions(30);
|
||||||
|
Assert.assertEquals(0, blocks.size());
|
||||||
|
|
||||||
|
// Creates 40 TX in the log.
|
||||||
|
for (Map.Entry<String, List<String>> entry : generateData(40).entrySet()){
|
||||||
|
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get first 30 TXs.
|
||||||
|
blocks = deletedBlockLog.getTransactions(30);
|
||||||
|
Assert.assertEquals(30, blocks.size());
|
||||||
|
for (int i = 0; i < 30; i++) {
|
||||||
|
Assert.assertEquals(i + 1, blocks.get(i).getTxID());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get another 30 TXs.
|
||||||
|
// The log only 10 left, so this time it will only return 10 TXs.
|
||||||
|
blocks = deletedBlockLog.getTransactions(30);
|
||||||
|
Assert.assertEquals(10, blocks.size());
|
||||||
|
for (int i = 30; i < 40; i++) {
|
||||||
|
Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get another 50 TXs.
|
||||||
|
// By now the position should have moved to the beginning,
|
||||||
|
// this call will return all 40 TXs.
|
||||||
|
blocks = deletedBlockLog.getTransactions(50);
|
||||||
|
Assert.assertEquals(40, blocks.size());
|
||||||
|
for (int i = 0; i < 40; i++) {
|
||||||
|
Assert.assertEquals(i + 1, blocks.get(i).getTxID());
|
||||||
|
}
|
||||||
|
List<Long> txIDs = new ArrayList<>();
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
txIDs.add(block.getTxID());
|
||||||
|
}
|
||||||
|
deletedBlockLog.commitTransactions(txIDs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrementCount() throws Exception {
|
||||||
|
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
|
||||||
|
|
||||||
|
// Create 30 TXs in the log.
|
||||||
|
for (Map.Entry<String, List<String>> entry : generateData(30).entrySet()){
|
||||||
|
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will return all TXs, total num 30.
|
||||||
|
List<DeletedBlocksTransaction> blocks =
|
||||||
|
deletedBlockLog.getTransactions(40);
|
||||||
|
List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
for (int i = 0; i < maxRetry; i++) {
|
||||||
|
deletedBlockLog.incrementCount(txIDs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment another time so it exceed the maxRetry.
|
||||||
|
// On this call, count will be set to -1 which means TX eventually fails.
|
||||||
|
deletedBlockLog.incrementCount(txIDs);
|
||||||
|
blocks = deletedBlockLog.getTransactions(40);
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
Assert.assertEquals(-1, block.getCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all TXs are failed, getTransactions call will always return nothing.
|
||||||
|
blocks = deletedBlockLog.getTransactions(40);
|
||||||
|
Assert.assertEquals(blocks.size(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitTransactions() throws Exception {
|
||||||
|
for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
|
||||||
|
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
List<DeletedBlocksTransaction> blocks =
|
||||||
|
deletedBlockLog.getTransactions(20);
|
||||||
|
List<Long> txIDs = new ArrayList<>();
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
txIDs.add(block.getTxID());
|
||||||
|
}
|
||||||
|
// Add an invalid txID.
|
||||||
|
txIDs.add(70L);
|
||||||
|
deletedBlockLog.commitTransactions(txIDs);
|
||||||
|
blocks = deletedBlockLog.getTransactions(50);
|
||||||
|
Assert.assertEquals(30, blocks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRandomOperateTransactions() throws Exception {
|
||||||
|
Random random = new Random();
|
||||||
|
int added = 0, committed = 0;
|
||||||
|
List<DeletedBlocksTransaction> blocks = new ArrayList<>();
|
||||||
|
List<Long> txIDs = new ArrayList<>();
|
||||||
|
byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#");
|
||||||
|
MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
|
||||||
|
(preKey, currentKey, nextKey) ->
|
||||||
|
!Arrays.equals(latestTxid, currentKey);
|
||||||
|
MetadataStore store = deletedBlockLog.getDeletedStore();
|
||||||
|
// Randomly add/get/commit/increase transactions.
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
int state = random.nextInt(4);
|
||||||
|
if (state == 0) {
|
||||||
|
for (Map.Entry<String, List<String>> entry :
|
||||||
|
generateData(10).entrySet()){
|
||||||
|
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
added += 10;
|
||||||
|
} else if (state == 1) {
|
||||||
|
blocks = deletedBlockLog.getTransactions(20);
|
||||||
|
txIDs = new ArrayList<>();
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
txIDs.add(block.getTxID());
|
||||||
|
}
|
||||||
|
deletedBlockLog.incrementCount(txIDs);
|
||||||
|
} else if (state == 2) {
|
||||||
|
txIDs = new ArrayList<>();
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
txIDs.add(block.getTxID());
|
||||||
|
}
|
||||||
|
blocks = new ArrayList<>();
|
||||||
|
committed += txIDs.size();
|
||||||
|
deletedBlockLog.commitTransactions(txIDs);
|
||||||
|
} else {
|
||||||
|
// verify the number of added and committed.
|
||||||
|
List<Map.Entry<byte[], byte[]>> result =
|
||||||
|
store.getRangeKVs(null, added, avoidLatestTxid);
|
||||||
|
Assert.assertEquals(added, result.size() + committed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPersistence() throws Exception {
|
||||||
|
for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
|
||||||
|
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
// close db and reopen it again to make sure
|
||||||
|
// transactions are stored persistently.
|
||||||
|
deletedBlockLog.close();
|
||||||
|
deletedBlockLog = new DeletedBlockLogImpl(conf);
|
||||||
|
List<DeletedBlocksTransaction> blocks =
|
||||||
|
deletedBlockLog.getTransactions(10);
|
||||||
|
List<Long> txIDs = new ArrayList<>();
|
||||||
|
for (DeletedBlocksTransaction block : blocks) {
|
||||||
|
txIDs.add(block.getTxID());
|
||||||
|
}
|
||||||
|
deletedBlockLog.commitTransactions(txIDs);
|
||||||
|
blocks = deletedBlockLog.getTransactions(10);
|
||||||
|
Assert.assertEquals(10, blocks.size());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue