HDDS-360. Use RocksDBStore and TableStore for SCM Metadata.

Contributed by Anu Engineer.
This commit is contained in:
Nanda kumar 2019-02-12 14:25:14 +05:30
parent 26e60135f5
commit a536eb5c41
30 changed files with 1121 additions and 408 deletions

View File

@ -115,6 +115,7 @@ public final class OzoneConsts {
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
public static final String SCM_DB_NAME = "scm.db";
public static final String STORAGE_DIR_CHUNKS = "chunks";

View File

@ -41,7 +41,7 @@ public class RDBStoreIterator
@Override
public void forEachRemaining(
Consumer<? super ByteArrayKeyValue> action) {
while(hasNext()) {
while (hasNext()) {
action.accept(next());
}
}
@ -56,7 +56,7 @@ public class RDBStoreIterator
if (rocksDBIterator.isValid()) {
ByteArrayKeyValue value =
ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
.value());
.value());
rocksDBIterator.next();
return value;
}
@ -83,6 +83,23 @@ public class RDBStoreIterator
return null;
}
@Override
public byte[] key() {
if (rocksDBIterator.isValid()) {
return rocksDBIterator.key();
}
return null;
}
@Override
public ByteArrayKeyValue value() {
if (rocksDBIterator.isValid()) {
return ByteArrayKeyValue.create(rocksDBIterator.key(),
rocksDBIterator.value());
}
return null;
}
@Override
public void close() throws IOException {
rocksDBIterator.close();

View File

@ -47,4 +47,16 @@ public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
*/
T seek(KEY key);
/**
* Returns the key value at the current position.
* @return KEY
*/
KEY key();
/**
* Returns the VALUE at the current position.
* @return VALUE
*/
T value();
}

View File

@ -140,12 +140,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
private TableIterator<byte[], ? extends KeyValue<byte[], byte[]>>
rawIterator;
private final Class<KEY> keyClass;
private final Class<VALUE> valueClass;
public TypedTableIterator(
TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> rawIterator,
Class<KEY> keyType,
Class<VALUE> valueType) {
this.rawIterator = rawIterator;
keyClass = keyType;
valueClass = valueType;
}
@Override
@ -168,6 +172,24 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
return new TypedKeyValue(result);
}
@Override
public KEY key() {
byte[] result = rawIterator.key();
if (result == null) {
return null;
}
return codecRegistry.asObject(result, keyClass);
}
@Override
public TypedKeyValue value() {
KeyValue keyValue = rawIterator.value();
if(keyValue != null) {
return new TypedKeyValue(keyValue, keyClass, valueClass);
}
return null;
}
@Override
public void close() throws IOException {
rawIterator.close();

View File

@ -18,14 +18,25 @@
package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.chillmode.Precheck;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collection;
/**
* SCM utility class.
*/
public final class ScmUtils {
private static final Logger LOG = LoggerFactory
.getLogger(ScmUtils.class);
private ScmUtils() {
}
@ -42,4 +53,30 @@ public final class ScmUtils {
preCheck.check(operation);
}
}
public static File getDBPath(Configuration conf, String dbDirectory) {
final Collection<String> dbDirs =
conf.getTrimmedStringCollection(dbDirectory);
if (dbDirs.size() > 1) {
throw new IllegalArgumentException(
"Bad configuration setting " + dbDirectory
+ ". OM does not support multiple metadata dirs currently.");
}
if (dbDirs.size() == 1) {
final File dbDirPath = new File(dbDirs.iterator().next());
if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
throw new IllegalArgumentException(
"Unable to create directory " + dbDirPath
+ " specified in configuration setting " + dbDirectory);
}
return dbDirPath;
}
LOG.warn("{} is not configured. We recommend adding this setting. "
+ "Falling back to {} instead.", dbDirectory,
HddsConfigKeys.OZONE_METADATA_DIRS);
return ServerUtils.getOzoneMetaDirPath(conf);
}
}

View File

@ -16,50 +16,45 @@
*/
package org.apache.hadoop.hdds.scm.block;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.INVALID_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
/** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements EventHandler<Boolean>,
@ -85,18 +80,14 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
* Constructor.
*
* @param conf - configuration.
* @param nodeManager - node manager.
* @param pipelineManager - pipeline manager.
* @param containerManager - container manager.
* @param eventPublisher - event publisher.
* @param scm
* @throws IOException
*/
public BlockManagerImpl(final Configuration conf,
final NodeManager nodeManager, final PipelineManager pipelineManager,
final ContainerManager containerManager, EventPublisher eventPublisher)
public BlockManagerImpl(final Configuration conf, StorageContainerManager scm)
throws IOException {
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
Objects.requireNonNull(scm, "SCM cannot be null");
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
this.containerSize = (long)conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@ -106,7 +97,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
scm.getScmMetadataStore());
long svcInterval =
conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
@ -118,7 +110,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
TimeUnit.MILLISECONDS);
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
nodeManager, eventPublisher, svcInterval, serviceTimeout, conf);
scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
serviceTimeout, conf);
chillModePrecheck = new ChillModePrecheck(conf);
}

View File

@ -17,39 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.block;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.command
.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -60,17 +29,29 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.utils.db.BatchOperation;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.TableIterator;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Math.min;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
/**
* A implement class of {@link DeletedBlockLog}, and it uses
@ -88,40 +69,21 @@ public class DeletedBlockLogImpl
public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
private static final byte[] LATEST_TXID =
DFSUtil.string2Bytes("#LATEST_TXID#");
private final int maxRetry;
private final MetadataStore deletedStore;
private final ContainerManager containerManager;
private final SCMMetadataStore scmMetadataStore;
private final Lock lock;
// The latest id of deleted blocks in the db.
private long lastTxID;
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
public DeletedBlockLogImpl(Configuration conf,
ContainerManager containerManager) throws IOException {
ContainerManager containerManager,
SCMMetadataStore scmMetadataStore) {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
final File metaDir = ServerUtils.getScmDbDir(conf);
final String scmMetaDataDir = metaDir.getPath();
final File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
// Load store of all transactions.
deletedStore = MetadataStoreBuilder.newBuilder()
.setCreateIfMissing(true)
.setConf(conf)
.setDbFile(deletedLogDbPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
this.containerManager = containerManager;
this.scmMetadataStore = scmMetadataStore;
this.lock = new ReentrantLock();
// start from the head of deleted store.
lastTxID = findLatestTxIDInStore();
// transactionToDNsCommitMap is updated only when
// transaction is added to the log and when it is removed.
@ -130,26 +92,6 @@ public class DeletedBlockLogImpl
transactionToDNsCommitMap = new ConcurrentHashMap<>();
}
@VisibleForTesting
public MetadataStore getDeletedStore() {
return deletedStore;
}
/**
* There is no need to lock before reading because
* it's only used in construct method.
*
* @return latest txid.
* @throws IOException
*/
private long findLatestTxIDInStore() throws IOException {
long txid = 0;
byte[] value = deletedStore.get(LATEST_TXID);
if (value != null) {
txid = Longs.fromByteArray(value);
}
return txid;
}
@Override
public List<DeletedBlocksTransaction> getFailedTransactions()
@ -157,16 +99,16 @@ public class DeletedBlockLogImpl
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
deletedStore.iterate(null, (key, value) -> {
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction delTX =
DeletedBlocksTransaction.parseFrom(value);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
return true;
});
}
return failedTXs;
} finally {
lock.unlock();
@ -181,44 +123,44 @@ public class DeletedBlockLogImpl
*/
@Override
public void incrementCount(List<Long> txIDs) throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
for(Long txID : txIDs) {
try {
byte[] deleteBlockBytes =
deletedStore.get(Longs.toByteArray(txID));
if (deleteBlockBytes == null) {
LOG.warn("Delete txID {} not found", txID);
continue;
}
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(deleteBlockBytes);
DeletedBlocksTransaction.Builder builder = block.toBuilder();
int currentCount = block.getCount();
if (currentCount > -1) {
builder.setCount(++currentCount);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
if (currentCount > maxRetry) {
builder.setCount(-1);
}
deletedStore.put(Longs.toByteArray(txID),
builder.build().toByteArray());
} catch (IOException ex) {
LOG.warn("Cannot increase count for txID " + txID, ex);
for (Long txID : txIDs) {
lock.lock();
try {
DeletedBlocksTransaction block =
scmMetadataStore.getDeletedBlocksTXTable().get(txID);
if (block == null) {
// Should we make this an error ? How can we not find the deleted
// TXID?
LOG.warn("Deleted TXID not found.");
continue;
}
DeletedBlocksTransaction.Builder builder = block.toBuilder();
int currentCount = block.getCount();
if (currentCount > -1) {
builder.setCount(++currentCount);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
if (currentCount > maxRetry) {
builder.setCount(-1);
}
scmMetadataStore.getDeletedBlocksTXTable().put(txID,
builder.build());
} catch (IOException ex) {
LOG.warn("Cannot increase count for txID " + txID, ex);
// We do not throw error here, since we don't want to abort the loop.
// Just log and continue processing the rest of txids.
} finally {
lock.unlock();
}
deletedStore.writeBatch(batch);
} finally {
lock.unlock();
}
}
private DeletedBlocksTransaction constructNewTransaction(long txID,
long containerID, List<Long> blocks) {
long containerID,
List<Long> blocks) {
return DeletedBlocksTransaction.newBuilder()
.setTxID(txID)
.setContainerID(containerID)
@ -231,7 +173,8 @@ public class DeletedBlockLogImpl
* {@inheritDoc}
*
* @param transactionResults - transaction IDs.
* @param dnID - Id of Datanode which has acknowledged a delete block command.
* @param dnID - Id of Datanode which has acknowledged
* a delete block command.
* @throws IOException
*/
@Override
@ -259,8 +202,8 @@ public class DeletedBlockLogImpl
}
dnsWithCommittedTxn.add(dnID);
final ContainerInfo container = containerManager
.getContainer(containerId);
final ContainerInfo container =
containerManager.getContainer(containerId);
final Set<ContainerReplica> replicas =
containerManager.getContainerReplicas(containerId);
// The delete entry can be safely removed from the log if all the
@ -275,7 +218,7 @@ public class DeletedBlockLogImpl
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
LOG.debug("Purging txId={} from block deletion log", txID);
deletedStore.delete(Longs.toByteArray(txID));
scmMetadataStore.getDeletedBlocksTXTable().delete(txID);
}
}
LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
@ -308,24 +251,18 @@ public class DeletedBlockLogImpl
* {@inheritDoc}
*
* @param containerID - container ID.
* @param blocks - blocks that belong to the same container.
* @param blocks - blocks that belong to the same container.
* @throws IOException
*/
@Override
public void addTransaction(long containerID, List<Long> blocks)
throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
containerID, blocks);
byte[] key = Longs.toByteArray(lastTxID + 1);
batch.put(key, tx.toByteArray());
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
deletedStore.writeBatch(batch);
lastTxID += 1;
Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx =
constructNewTransaction(nextTXID, containerID, blocks);
scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
} finally {
lock.unlock();
}
@ -336,17 +273,16 @@ public class DeletedBlockLogImpl
lock.lock();
try {
final AtomicInteger num = new AtomicInteger(0);
deletedStore.iterate(null, (key, value) -> {
// Exclude latest txid record
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction delTX =
DeletedBlocksTransaction.parseFrom(value);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() > -1) {
num.incrementAndGet();
}
}
return true;
});
}
return num.get();
} finally {
lock.unlock();
@ -360,24 +296,19 @@ public class DeletedBlockLogImpl
* @throws IOException
*/
@Override
public void addTransactions(
Map<Long, List<Long>> containerBlocksMap)
public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
long currentLatestID = lastTxID;
for (Map.Entry<Long, List<Long>> entry :
containerBlocksMap.entrySet()) {
currentLatestID += 1;
byte[] key = Longs.toByteArray(currentLatestID);
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
BatchOperation batch = scmMetadataStore.getStore().initBatchOperation();
for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
batch.put(key, tx.toByteArray());
scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
nextTXID, tx);
}
lastTxID = currentLatestID;
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
deletedStore.writeBatch(batch);
scmMetadataStore.getStore().commitBatchOperation(batch);
} finally {
lock.unlock();
}
@ -385,9 +316,6 @@ public class DeletedBlockLogImpl
@Override
public void close() throws IOException {
if (deletedStore != null) {
deletedStore.close();
}
}
@Override
@ -396,11 +324,12 @@ public class DeletedBlockLogImpl
lock.lock();
try {
Map<Long, Long> deleteTransactionMap = new HashMap<>();
deletedStore.iterate(null, (key, value) -> {
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(value);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
while (iter.hasNext()) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
DeletedBlocksTransaction block = keyValue.getValue();
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
if (transactions.addTransaction(block,
transactionToDNsCommitMap.get(block.getTxID()))) {
@ -409,10 +338,8 @@ public class DeletedBlockLogImpl
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
}
}
return !transactions.isFull();
}
return true;
});
}
return deleteTransactionMap;
} finally {
lock.unlock();
@ -421,7 +348,7 @@ public class DeletedBlockLogImpl
@Override
public void onMessage(DeleteBlockStatus deleteBlockStatus,
EventPublisher publisher) {
EventPublisher publisher) {
ContainerBlocksDeletionACKProto ackProto =
deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
commitTransactions(ackProto.getResultsList(),

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.scm.metadata;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.utils.db.Codec;
/**
* Codec for Persisting the DeletedBlocks.
*/
public class DeletedBlocksTransactionCodec
implements Codec<DeletedBlocksTransaction> {
@Override
public byte[] toPersistedFormat(DeletedBlocksTransaction object) {
return object.toByteArray();
}
@Override
public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) {
try {
return DeletedBlocksTransaction.parseFrom(rawData);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Can't convert rawBytes to DeletedBlocksTransaction.", e);
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.scm.metadata;
import com.google.common.primitives.Longs;
import org.apache.hadoop.utils.db.Codec;
/**
* Codec for Persisting the DeletedBlocks.
*/
public class LongCodec implements Codec<Long> {
@Override
public byte[] toPersistedFormat(Long object) {
return Longs.toByteArray(object);
}
@Override
public Long fromPersistedFormat(byte[] rawData) {
return Longs.fromByteArray(rawData);
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.metadata;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
/**
* Generic interface for data stores for SCM.
* This is similar to the OMMetadataStore class,
* where we write classes into some underlying storage system.
*/
public interface SCMMetadataStore {
/**
* Start metadata manager.
*
* @param configuration - Configuration
* @throws IOException - Unable to start metadata store.
*/
void start(OzoneConfiguration configuration) throws IOException;
/**
* Stop metadata manager.
*/
void stop() throws Exception;
/**
* Get metadata store.
*
* @return metadata store.
*/
@VisibleForTesting
DBStore getStore();
/**
* A Table that keeps the deleted blocks lists and transactions.
* @return Table
*/
Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();
/**
* Returns the current TXID for the deleted blocks.
* @return Long
*/
Long getCurrentTXID();
/**
* Returns the next TXID for the Deleted Blocks.
* @return Long.
*/
Long getNextDeleteBlockTXID();
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.metadata;
import java.io.File;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import java.io.IOException;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.DBStoreBuilder;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
/**
* A RocksDB based implementation of SCM Metadata Store.
* <p>
* <p>
* +---------------+------------+-------------------------+
* | Column Family | Key | Value |
* +---------------+------------+-------------------------+
* | DeletedBlocks | TXID(Long) | DeletedBlockTransaction |
* +---------------+------------+-------------------------+
*/
public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
private Table deletedBlocksTable;
private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
private DBStore store;
private final OzoneConfiguration configuration;
private final AtomicLong txID;
/**
* Constructs the metadata store and starts the DB Services.
*
* @param config - Ozone Configuration.
* @throws IOException - on Failure.
*/
public SCMMetadataStoreRDBImpl(OzoneConfiguration config) throws IOException {
this.configuration = config;
start(this.configuration);
this.txID = new AtomicLong(this.getLargestRecordedTXID());
}
@Override
public void start(OzoneConfiguration config)
throws IOException {
if (this.store == null) {
File metaDir = ServerUtils.getScmDbDir(configuration);
this.store = DBStoreBuilder.newBuilder(configuration)
.setName(SCM_DB_NAME)
.setPath(Paths.get(metaDir.getPath()))
.addTable(DELETED_BLOCKS_TABLE)
.addCodec(DeletedBlocksTransaction.class,
new DeletedBlocksTransactionCodec())
.addCodec(Long.class, new LongCodec())
.build();
deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
Long.class, DeletedBlocksTransaction.class);
checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
}
}
@Override
public void stop() throws Exception {
if (store != null) {
store.close();
store = null;
}
}
@Override
public org.apache.hadoop.utils.db.DBStore getStore() {
return this.store;
}
@Override
public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
return deletedBlocksTable;
}
@Override
public Long getNextDeleteBlockTXID() {
return this.txID.incrementAndGet();
}
@Override
public Long getCurrentTXID() {
return this.txID.get();
}
/**
* Returns the largest recorded TXID from the DB.
*
* @return Long
* @throws IOException
*/
private Long getLargestRecordedTXID() throws IOException {
try (TableIterator<Long, DeletedBlocksTransaction> txIter =
deletedBlocksTable.iterator()) {
txIter.seekToLast();
Long txid = txIter.key();
if (txid != null) {
return txid;
}
}
return 0L;
}
private void checkTableStatus(Table table, String name) throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
"continue.";
String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" +
"for more info.";
if (table == null) {
LOG.error(String.format(logMessage, name));
throw new IOException(String.format(errMsg, name));
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Metadata layer for SCM.
*/
package org.apache.hadoop.hdds.scm.metadata;

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@ -200,8 +199,8 @@ public class SCMNodeManager implements NodeManager {
return VersionResponse.newBuilder()
.setVersion(this.version.getVersion())
.addValue(OzoneConsts.SCM_ID,
this.scmManager.getScmStorage().getScmId())
.addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
this.scmManager.getScmStorageConfig().getScmId())
.addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorageConfig()
.getClusterID())
.build();
}

View File

@ -248,8 +248,8 @@ public class SCMBlockProtocolServer implements
try{
ScmInfo.Builder builder =
new ScmInfo.Builder()
.setClusterId(scm.getScmStorage().getClusterID())
.setScmId(scm.getScmStorage().getScmId());
.setClusterId(scm.getScmStorageConfig().getClusterID())
.setScmId(scm.getScmStorageConfig().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;

View File

@ -422,8 +422,8 @@ public class SCMClientProtocolServer implements
try{
ScmInfo.Builder builder =
new ScmInfo.Builder()
.setClusterId(scm.getScmStorage().getClusterID())
.setScmId(scm.getScmStorage().getScmId());
.setClusterId(scm.getScmStorageConfig().getClusterID())
.setScmId(scm.getScmStorageConfig().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
/**
* This class acts as an SCM builder Class. This class is important for us
* from a resilience perspective of SCM. This class will allow us swap out
* different managers and replace with out on manager in the testing phase.
* <p>
* At some point in the future, we will make all these managers dynamically
* loadable, so other developers can extend SCM by replacing various managers.
* <p>
* TODO: Add different config keys, so that we can load different managers at
* run time. This will make it easy to extend SCM without having to replace
* whole SCM each time.
* <p>
* Different Managers supported by this builder are:
* NodeManager scmNodeManager;
* PipelineManager pipelineManager;
* ContainerManager containerManager;
* BlockManager scmBlockManager;
* ReplicationManager replicationManager;
* SCMChillModeManager scmChillModeManager;
* CertificateServer certificateServer;
* SCMMetadata scmMetadataStore.
*
* If any of these are *not* specified then the default version of these
* managers are used by SCM.
*
*/
public final class SCMConfigurator {
private NodeManager scmNodeManager;
private PipelineManager pipelineManager;
private ContainerManager containerManager;
private BlockManager scmBlockManager;
private ReplicationManager replicationManager;
private SCMChillModeManager scmChillModeManager;
private CertificateServer certificateServer;
private SCMMetadataStore metadataStore;
/**
* Allows user to specify a version of Node manager to use with this SCM.
* @param scmNodeManager - Node Manager.
*/
public void setScmNodeManager(NodeManager scmNodeManager) {
this.scmNodeManager = scmNodeManager;
}
/**
* Allows user to specify a custom version of PipelineManager to use with
* this SCM.
* @param pipelineManager - Pipeline Manager.
*/
public void setPipelineManager(PipelineManager pipelineManager) {
this.pipelineManager = pipelineManager;
}
/**
* Allows user to specify a custom version of containerManager to use with
* this SCM.
* @param containerManager - Container Manager.
*/
public void setContainerManager(ContainerManager containerManager) {
this.containerManager = containerManager;
}
/**
* Allows user to specify a custom version of Block Manager to use with
* this SCM.
* @param scmBlockManager - Block Manager
*/
public void setScmBlockManager(BlockManager scmBlockManager) {
this.scmBlockManager = scmBlockManager;
}
/**
* Allows user to specify a custom version of Replication Manager to use
* with this SCM.
* @param replicationManager - replication Manager.
*/
public void setReplicationManager(ReplicationManager replicationManager) {
this.replicationManager = replicationManager;
}
/**
* Allows user to specify a custom version of Chill Mode Manager to use
* with this SCM.
* @param scmChillModeManager - ChillMode Manager.
*/
public void setScmChillModeManager(SCMChillModeManager scmChillModeManager) {
this.scmChillModeManager = scmChillModeManager;
}
/**
* Allows user to specify a custom version of Certificate Server to use
* with this SCM.
* @param certificateAuthority - Certificate server.
*/
public void setCertificateServer(CertificateServer certificateAuthority) {
this.certificateServer = certificateAuthority;
}
/**
* Allows user to specify a custom version of Metadata Store to be used
* with this SCM.
* @param scmMetadataStore - scm metadata store.
*/
public void setMetadataStore(SCMMetadataStore scmMetadataStore) {
this.metadataStore = scmMetadataStore;
}
/**
* Gets SCM Node Manager.
* @return Node Manager.
*/
public NodeManager getScmNodeManager() {
return scmNodeManager;
}
/**
* Get Pipeline Manager.
* @return pipeline manager.
*/
public PipelineManager getPipelineManager() {
return pipelineManager;
}
/**
* Get Container Manager.
* @return container Manger.
*/
public ContainerManager getContainerManager() {
return containerManager;
}
/**
* Get SCM Block Manager.
* @return Block Manager.
*/
public BlockManager getScmBlockManager() {
return scmBlockManager;
}
/**
* Get Replica Manager.
* @return Replica Manager.
*/
public ReplicationManager getReplicationManager() {
return replicationManager;
}
/**
* Gets Chill Mode Manager.
* @return Chill Mode manager.
*/
public SCMChillModeManager getScmChillModeManager() {
return scmChillModeManager;
}
/**
* Get Certificate Manager.
* @return Certificate Manager.
*/
public CertificateServer getCertificateServer() {
return certificateServer;
}
/**
* Get Metadata Store.
* @return SCMMetadataStore.
*/
public SCMMetadataStore getMetadataStore() {
return metadataStore;
}
}

View File

@ -30,16 +30,16 @@ import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
/**
* SCMStorage is responsible for management of the StorageDirectories used by
* the SCM.
* SCMStorageConfig is responsible for management of the
* StorageDirectories used by the SCM.
*/
public class SCMStorage extends Storage {
public class SCMStorageConfig extends Storage {
/**
* Construct SCMStorage.
* Construct SCMStorageConfig.
* @throws IOException if any directories are inaccessible.
*/
public SCMStorage(OzoneConfiguration conf) throws IOException {
public SCMStorageConfig(OzoneConfiguration conf) throws IOException {
super(NodeType.SCM, ServerUtils.getScmDbDir(conf), STORAGE_DIR);
}
@ -70,4 +70,4 @@ public class SCMStorage extends Storage {
return scmProperties;
}
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.protobuf.BlockingService;
import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
@ -58,6 +59,8 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
@ -125,8 +128,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
* and returns a pipeline.
*
* <p>A client once it gets a pipeline (a list of datanodes) will connect to
* the datanodes and
* create a container, which then can be used to store data.
* the datanodes and create a container, which then can be used to store data.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public final class StorageContainerManager extends ServiceRuntimeInfoImpl
@ -158,16 +160,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SCMDatanodeProtocolServer datanodeProtocolServer;
private final SCMBlockProtocolServer blockProtocolServer;
private final SCMClientProtocolServer clientProtocolServer;
private final SCMSecurityProtocolServer securityProtocolServer;
private SCMSecurityProtocolServer securityProtocolServer;
/*
* State Managers of SCM.
*/
private final NodeManager scmNodeManager;
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private final BlockManager scmBlockManager;
private final SCMStorage scmStorage;
private NodeManager scmNodeManager;
private PipelineManager pipelineManager;
private ContainerManager containerManager;
private BlockManager scmBlockManager;
private final SCMStorageConfig scmStorageConfig;
private SCMMetadataStore scmMetadataStore;
private final EventQueue eventQueue;
/*
@ -188,13 +192,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
*/
private Cache<String, ContainerStat> containerReportCache;
private final ReplicationManager replicationManager;
private ReplicationManager replicationManager;
private final LeaseManager<Long> commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
private final SCMChillModeManager scmChillModeManager;
private final CertificateServer certificateServer;
private SCMChillModeManager scmChillModeManager;
private CertificateServer certificateServer;
private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
@ -206,29 +210,54 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
*
* @param conf configuration
*/
private StorageContainerManager(OzoneConfiguration conf)
public StorageContainerManager(OzoneConfiguration conf)
throws IOException, AuthenticationException {
// default empty configurator means default managers will be used.
this(conf, new SCMConfigurator());
}
/**
* This constructor offers finer control over how SCM comes up.
* To use this, user needs to create a SCMConfigurator and set various
* managers that user wants SCM to use, if a value is missing then SCM will
* use the default value for that manager.
*
* @param conf - Configuration
* @param configurator - configurator
*/
public StorageContainerManager(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException, AuthenticationException {
super(HddsVersionInfo.HDDS_VERSION_INFO);
Objects.requireNonNull(configurator, "configurator cannot not be null");
Objects.requireNonNull(conf, "configuration cannot not be null");
configuration = conf;
StorageContainerManager.initMetrics();
initContainerReportCache(conf);
scmStorage = new SCMStorage(conf);
if (scmStorage.getState() != StorageState.INITIALIZED) {
throw new SCMException("SCM not initialized.", ResultCodes
.SCM_NOT_INITIALIZED);
/**
* It is assumed the scm --init command creates the SCM Storage Config.
*/
scmStorageConfig = new SCMStorageConfig(conf);
if (scmStorageConfig.getState() != StorageState.INITIALIZED) {
LOG.error("Please make sure you have run \'ozone scm --init\' " +
"command to generate all the required metadata.");
throw new SCMException("SCM not initialized due to storage config " +
"failure.", ResultCodes.SCM_NOT_INITIALIZED);
}
/**
* Important : This initialization sequence is assumed by some of our tests.
* The testSecureOzoneCluster assumes that security checks have to be
* passed before any artifacts like SCM DB is created. So please don't
* add any other initialization above the Security checks please.
*/
// Authenticate SCM if security is enabled
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
loginAsSCMUser(conf);
certificateServer = initializeCertificateServer(
getScmStorage().getClusterID(), getScmStorage().getScmId());
// TODO: Support Intermediary CAs in future.
certificateServer.init(new SecurityConfig(conf),
CertificateServer.CAType.SELF_SIGNED_CA);
securityProtocolServer = new SCMSecurityProtocolServer(conf,
certificateServer);
initializeCAnSecurityProtocol(conf, configurator);
} else {
// if no Security, we do not create a Certificate Server at all.
// This allows user to boot SCM without security temporarily
@ -237,16 +266,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
securityProtocolServer = null;
}
// Creates the SCM DBs or opens them if it exists.
initalizeMetadataStore(conf, configurator);
eventQueue = new EventQueue();
scmNodeManager = new SCMNodeManager(
conf, scmStorage.getClusterID(), this, eventQueue);
pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue);
containerManager = new SCMContainerManager(
conf, scmNodeManager, pipelineManager, eventQueue);
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
pipelineManager, containerManager, eventQueue);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
initalizeSystemManagers(conf, configurator);
replicationStatus = new ReplicationActivityStatus();
CloseContainerEventHandler closeContainerHandler =
@ -280,12 +309,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, conf);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
new RetriableDatanodeEventWatcher<>(
@ -294,13 +317,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
commandWatcherLeaseManager);
retriableDatanodeEventWatcher.start(eventQueue);
//TODO: support configurable containerPlacement policy
ContainerPlacementPolicy containerPlacementPolicy =
new SCMContainerPlacementCapacity(scmNodeManager, conf);
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, eventQueue, commandWatcherLeaseManager);
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@ -342,13 +358,120 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
replicationStatus.getChillModeStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
scmChillModeManager = new SCMChillModeManager(conf,
containerManager.getContainers(), pipelineManager, eventQueue);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
registerMXBean();
}
/**
* This function initializes the following managers. If the configurator
* specifies a value, we will use it, else we will use the default value.
*
* Node Manager
* Pipeline Manager
* Container Manager
* Block Manager
* Replication Manager
* Chill Mode Manager
*
* @param conf - Ozone Configuration.
* @param configurator - A customizer which allows different managers to be
* used if needed.
* @throws IOException - on Failure.
*/
private void initalizeSystemManagers(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException {
if(configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(
conf, scmStorageConfig.getClusterID(), this, eventQueue);
}
//TODO: support configurable containerPlacement policy
ContainerPlacementPolicy containerPlacementPolicy =
new SCMContainerPlacementCapacity(scmNodeManager, conf);
if (configurator.getPipelineManager() != null) {
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
new SCMPipelineManager(conf, scmNodeManager, eventQueue);
}
if(configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
containerManager = new SCMContainerManager(
conf, scmNodeManager, pipelineManager, eventQueue);
}
if(configurator.getScmBlockManager() != null) {
scmBlockManager = configurator.getScmBlockManager();
} else {
scmBlockManager = new BlockManagerImpl(conf, this);
}
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
} else {
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, eventQueue, commandWatcherLeaseManager);
}
if(configurator.getScmChillModeManager() != null) {
scmChillModeManager = configurator.getScmChillModeManager();
} else {
scmChillModeManager = new SCMChillModeManager(conf,
containerManager.getContainers(), pipelineManager, eventQueue);
}
}
/**
* If security is enabled we need to have the Security Protocol and a
* default CA. This function initializes those values based on the
* configurator.
*
* @param conf - Config
* @param configurator - configurator
* @throws IOException - on Failure
* @throws AuthenticationException - on Failure
*/
private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException, AuthenticationException {
loginAsSCMUser(conf);
if(configurator.getCertificateServer() != null) {
this.certificateServer = configurator.getCertificateServer();
} else {
certificateServer = initializeCertificateServer(
getScmStorageConfig().getClusterID(),
getScmStorageConfig().getScmId());
}
// TODO: Support Intermediary CAs in future.
certificateServer.init(new SecurityConfig(conf),
CertificateServer.CAType.SELF_SIGNED_CA);
securityProtocolServer = new SCMSecurityProtocolServer(conf,
certificateServer);
}
/**
* Init the metadata store based on the configurator.
* @param conf - Config
* @param configurator - configurator
* @throws IOException - on Failure
*/
private void initalizeMetadataStore(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException {
if(configurator.getMetadataStore() != null) {
scmMetadataStore = configurator.getMetadataStore();
} else {
scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
if (scmMetadataStore == null) {
throw new SCMException("Unable to initialize metadata store",
ResultCodes.SCM_NOT_INITIALIZED);
}
}
}
/**
@ -393,7 +516,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
// So it is easy to use different Certificate Servers if needed.
String subject = "scm@" + InetAddress.getLocalHost().getHostName();
return new DefaultCAServer(subject, clusterID, scmID);
}
/**
@ -562,21 +684,21 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* @throws IOException if init fails due to I/O error
*/
public static boolean scmInit(OzoneConfiguration conf) throws IOException {
SCMStorage scmStorage = new SCMStorage(conf);
StorageState state = scmStorage.getState();
SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
StorageState state = scmStorageConfig.getState();
if (state != StorageState.INITIALIZED) {
try {
String clusterId = StartupOption.INIT.getClusterId();
if (clusterId != null && !clusterId.isEmpty()) {
scmStorage.setClusterId(clusterId);
scmStorageConfig.setClusterId(clusterId);
}
scmStorage.initialize();
scmStorageConfig.initialize();
System.out.println(
"SCM initialization succeeded."
+ "Current cluster id for sd="
+ scmStorage.getStorageDir()
+ scmStorageConfig.getStorageDir()
+ ";cid="
+ scmStorage.getClusterID());
+ scmStorageConfig.getClusterID());
return true;
} catch (IOException ioe) {
LOG.error("Could not initialize SCM version file", ioe);
@ -586,9 +708,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
System.out.println(
"SCM already initialized. Reusing existing"
+ " cluster id for sd="
+ scmStorage.getStorageDir()
+ scmStorageConfig.getStorageDir()
+ ";cid="
+ scmStorage.getClusterID());
+ scmStorageConfig.getClusterID());
return true;
}
}
@ -649,8 +771,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return metrics == null ? SCMMetrics.create() : metrics;
}
public SCMStorage getScmStorage() {
return scmStorage;
public SCMStorageConfig getScmStorageConfig() {
return scmStorageConfig;
}
public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
@ -878,6 +1000,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
IOUtils.cleanupWithLogger(LOG, containerManager);
IOUtils.cleanupWithLogger(LOG, pipelineManager);
try {
scmMetadataStore.stop();
} catch (Exception ex) {
LOG.error("SCM Metadata store stop failed", ex);
}
}
/**
@ -1049,6 +1177,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return nodeStateCount;
}
/**
* Returns the SCM metadata Store.
* @return SCMMetadataStore
*/
public SCMMetadataStore getScmMetadataStore() {
return scmMetadataStore;
}
/**
* Startup options.
*/

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@ -78,7 +78,7 @@ public final class HddsTestUtils {
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
@ -48,8 +49,12 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import java.io.File;
import java.io.IOException;
@ -59,6 +64,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
/**
* Stateless helper functions to handler scm/datanode connection.
*/
@ -461,4 +468,20 @@ public final class TestUtils {
id, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
}
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
scmStore.initialize();
}
return StorageContainerManager.createSCM(null, conf);
}
}

View File

@ -17,24 +17,25 @@
package org.apache.hadoop.hdds.scm.block;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -49,11 +50,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
import org.junit.rules.TemporaryFolder;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
@ -64,6 +61,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
* Tests for SCM Block Manager.
*/
public class TestBlockManager implements EventHandler<Boolean> {
private StorageContainerManager scm;
private SCMContainerManager mapping;
private MockNodeManager nodeManager;
private PipelineManager pipelineManager;
@ -75,11 +73,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
private static String containerOwner = "OZONE";
private static EventQueue eventQueue;
private int numContainerPerOwnerInPipeline;
private Configuration conf;
private OzoneConfiguration conf;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TemporaryFolder folder= new TemporaryFolder();
@Before
public void setUp() throws Exception {
conf = SCMTestUtils.getConf();
@ -87,24 +88,23 @@ public class TestBlockManager implements EventHandler<Boolean> {
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
String path = GenericTestUtils
.getTempPath(TestBlockManager.class.getSimpleName());
testDir = Paths.get(path).toFile();
testDir.delete();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, path);
eventQueue = new EventQueue();
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
}
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
// Override the default Node Manager in SCM with this Mock Node Manager.
nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new SCMPipelineManager(conf, nodeManager, eventQueue);
mapping = new SCMContainerManager(conf, nodeManager, pipelineManager,
eventQueue);
blockManager = new BlockManagerImpl(conf,
nodeManager, pipelineManager, mapping, eventQueue);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
SCMConfigurator configurator = new SCMConfigurator();
configurator.setScmNodeManager(nodeManager);
scm = getScm(conf, configurator);
// Initialize these fields so that the tests can pass.
mapping = (SCMContainerManager) scm.getContainerManager();
pipelineManager = scm.getPipelineManager();
blockManager = (BlockManagerImpl) scm.getScmBlockManager();
eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scm.getScmBlockManager());
eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
@ -121,16 +121,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
@After
public void cleanup() throws IOException {
blockManager.close();
pipelineManager.close();
mapping.close();
FileUtil.fullyDelete(testDir);
scm.stop();
}
private static StorageContainerManager getScm(OzoneConfiguration conf)
private static StorageContainerManager getScm(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
@ -139,7 +137,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
// writes the version file properties
scmStore.initialize();
}
return StorageContainerManager.createSCM(null, conf);
return new StorageContainerManager(conf, configurator);
}
@Test

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -42,7 +44,8 @@ import org.apache.hadoop.hdds.protocol.proto
.DeleteBlockTransactionResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.TableIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -62,10 +65,12 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;
@ -78,6 +83,7 @@ public class TestDeletedBlockLog {
private OzoneConfiguration conf;
private File testDir;
private ContainerManager containerManager;
private StorageContainerManager scm;
private List<DatanodeDetails> dnList;
@Before
@ -85,10 +91,13 @@ public class TestDeletedBlockLog {
testDir = GenericTestUtils.getTestDir(
TestDeletedBlockLog.class.getSimpleName());
conf = new OzoneConfiguration();
conf.set(OZONE_ENABLED, "true");
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
scm = TestUtils.getScm(conf);
containerManager = Mockito.mock(SCMContainerManager.class);
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
scm.getScmMetadataStore());
dnList = new ArrayList<>(3);
setupContainerManager();
}
@ -126,6 +135,8 @@ public class TestDeletedBlockLog {
@After
public void tearDown() throws Exception {
deletedBlockLog.close();
scm.stop();
scm.join();
FileUtils.deleteDirectory(testDir);
}
@ -263,7 +274,6 @@ public class TestDeletedBlockLog {
MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
(preKey, currentKey, nextKey) ->
!Arrays.equals(latestTxid, currentKey);
MetadataStore store = deletedBlockLog.getDeletedStore();
// Randomly add/get/commit/increase transactions.
for (int i = 0; i < 100; i++) {
int state = random.nextInt(4);
@ -286,9 +296,13 @@ public class TestDeletedBlockLog {
blocks = new ArrayList<>();
} else {
// verify the number of added and committed.
List<Map.Entry<byte[], byte[]>> result =
store.getRangeKVs(null, added, avoidLatestTxid);
Assert.assertEquals(added, result.size() + committed);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scm.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) {
AtomicInteger count = new AtomicInteger();
iter.forEachRemaining((keyValue) -> count.incrementAndGet());
Assert.assertEquals(added, count.get() + committed);
}
}
}
blocks = getTransactions(1000);
@ -303,7 +317,8 @@ public class TestDeletedBlockLog {
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
scm.getScmMetadataStore());
List<DeletedBlocksTransaction> blocks =
getTransactions(10);
commitTransactions(blocks);

View File

@ -23,13 +23,11 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@ -117,33 +115,11 @@ public final class OmUtils {
* Get the location where OM should store its metadata directories.
* Fall back to OZONE_METADATA_DIRS if not defined.
*
* @param conf
* @return
* @param conf - Config
* @return File path, after creating all the required Directories.
*/
public static File getOmDbDir(Configuration conf) {
final Collection<String> dbDirs = conf.getTrimmedStringCollection(
OMConfigKeys.OZONE_OM_DB_DIRS);
if (dbDirs.size() > 1) {
throw new IllegalArgumentException(
"Bad configuration setting " + OMConfigKeys.OZONE_OM_DB_DIRS +
". OM does not support multiple metadata dirs currently.");
}
if (dbDirs.size() == 1) {
final File dbDirPath = new File(dbDirs.iterator().next());
if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
throw new IllegalArgumentException("Unable to create directory " +
dbDirPath + " specified in configuration setting " +
OMConfigKeys.OZONE_OM_DB_DIRS);
}
return dbDirPath;
}
LOG.warn("{} is not configured. We recommend adding this setting. " +
"Falling back to {} instead.",
OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS);
return ServerUtils.getOzoneMetaDirPath(conf);
return ScmUtils.getDBPath(conf, OMConfigKeys.OZONE_OM_DB_DIRS);
}
/**

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ipc.Client;
@ -40,7 +41,6 @@ import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocolPB
@ -437,12 +437,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private StorageContainerManager createSCM()
throws IOException, AuthenticationException {
configureSCM();
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
initializeScmStorage(scmStore);
return StorageContainerManager.createSCM(null, conf);
}
private void initializeScmStorage(SCMStorage scmStore) throws IOException {
private void initializeScmStorage(SCMStorageConfig scmStore)
throws IOException {
if (scmStore.getState() == StorageState.INITIALIZED) {
return;
}

View File

@ -36,7 +36,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -45,7 +44,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@ -78,6 +77,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,6 +114,8 @@ public final class TestSecureOzoneCluster {
private OzoneManagerProtocolClientSideTranslatorPB omClient;
private KeyPair keyPair;
private Path metaDirPath;
@Rule
public TemporaryFolder folder= new TemporaryFolder();
@Before
public void init() {
@ -121,8 +123,7 @@ public final class TestSecureOzoneCluster {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
DefaultMetricsSystem.setMiniClusterMode(true);
final String path = GenericTestUtils
.getTempPath(UUID.randomUUID().toString());
final String path = folder.newFolder().toString();
metaDirPath = Paths.get(path, "om-meta");
conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
startMiniKdc();
@ -149,23 +150,22 @@ public final class TestSecureOzoneCluster {
if (omClient != null) {
omClient.close();
}
FileUtils.deleteQuietly(metaDirPath.toFile());
} catch (Exception e) {
logger.error("Failed to stop TestSecureOzoneCluster", e);
}
}
private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
throws Exception {
private void createCredentialsInKDC(Configuration configuration,
MiniKdc kdc) throws Exception {
createPrincipal(scmKeytab,
conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
configuration.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
createPrincipal(spnegoKeytab,
conf.get(ScmConfigKeys
configuration.get(ScmConfigKeys
.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
conf.get(OMConfigKeys
configuration.get(OMConfigKeys
.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
createPrincipal(omKeyTab,
conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
configuration.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
}
private void createPrincipal(File keytab, String... principal)
@ -185,37 +185,39 @@ public final class TestSecureOzoneCluster {
miniKdc.stop();
}
private void setSecureConfig(Configuration conf) throws IOException {
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setBoolean(OZONE_ENABLED, true);
String host = InetAddress.getLocalHost().getCanonicalHostName();
private void setSecureConfig(Configuration configuration) throws IOException {
configuration.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
configuration.setBoolean(OZONE_ENABLED, true);
String host = InetAddress.getLocalHost().getCanonicalHostName()
.toLowerCase();
String realm = miniKdc.getRealm();
curUser = UserGroupInformation.getCurrentUser()
.getUserName();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
configuration.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
conf.set(OZONE_ADMINISTRATORS, curUser);
configuration.set(OZONE_ADMINISTRATORS, curUser);
conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
"scm/" + host + "@" + realm);
conf.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
configuration.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
"HTTP_SCM/" + host + "@" + realm);
conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
"om/" + host + "@" + realm);
conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
configuration.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
"HTTP_OM/" + host + "@" + realm);
scmKeytab = new File(workDir, "scm.keytab");
spnegoKeytab = new File(workDir, "http.keytab");
omKeyTab = new File(workDir, "om.keytab");
conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
scmKeytab.getAbsolutePath());
conf.set(
configuration.set(
ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY,
spnegoKeytab.getAbsolutePath());
conf.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
omKeyTab.getAbsolutePath());
conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE,
spnegoKeytab.getAbsolutePath());
@ -239,12 +241,15 @@ public final class TestSecureOzoneCluster {
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
final String path = GenericTestUtils
.getTempPath(UUID.randomUUID().toString());
final String path = folder.newFolder().toString();
Path scmPath = Paths.get(path, "scm-meta");
File temp = scmPath.toFile();
if(!temp.exists()) {
temp.mkdirs();
}
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
@ -586,5 +591,4 @@ public final class TestSecureOzoneCluster {
CertificateClient certClient = new CertificateClientTestImpl(config);
om.setCertClient(certClient);
}
}

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
@ -398,7 +398,7 @@ public class TestStorageContainerManager {
// This will initialize SCM
StorageContainerManager.scmInit(conf);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
Assert.assertEquals("testClusterId", scmStore.getClusterID());
StartupOption.INIT.setClusterId("testClusterIdNew");
@ -422,7 +422,7 @@ public class TestStorageContainerManager {
StartupOption.INIT.setClusterId("testClusterId");
// This will initialize SCM
StorageContainerManager.scmInit(conf);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
cluster.shutdown();
@ -438,7 +438,8 @@ public class TestStorageContainerManager {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
exception.expect(SCMException.class);
exception.expectMessage("SCM not initialized.");
exception.expectMessage(
"SCM not initialized due to storage config failure");
StorageContainerManager.createSCM(null, conf);
}
@ -463,7 +464,7 @@ public class TestStorageContainerManager {
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);

View File

@ -220,9 +220,9 @@ public class TestBlockDeletion {
private void verifyTransactionsCommitted() throws IOException {
DeletedBlockLogImpl deletedBlockLog =
(DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog();
for (int txnID = 1; txnID <= maxTransactionId; txnID++) {
for (long txnID = 1; txnID <= maxTransactionId; txnID++) {
Assert.assertNull(
deletedBlockLog.getDeletedStore().get(Longs.toByteArray(txnID)));
scm.getScmMetadataStore().getDeletedBlocksTXTable().get(txnID));
}
}

View File

@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.net.NetUtils;
@ -31,7 +32,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto
@ -1311,11 +1311,11 @@ public class TestOzoneManager {
public void testOmInitialization() throws IOException {
// Read the version file info from OM version file
OMStorage omStorage = cluster.getOzoneManager().getOmStorage();
SCMStorage scmStorage = new SCMStorage(conf);
SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
// asserts whether cluster Id and SCM ID are properly set in SCM Version
// file.
Assert.assertEquals(clusterId, scmStorage.getClusterID());
Assert.assertEquals(scmId, scmStorage.getScmId());
Assert.assertEquals(clusterId, scmStorageConfig.getClusterID());
Assert.assertEquals(scmId, scmStorageConfig.getScmId());
// asserts whether OM Id is properly set in OM Version file.
Assert.assertEquals(omId, omStorage.getOmId());
// asserts whether the SCM info is correct in OM Version file.

View File

@ -127,8 +127,8 @@ public class TestContainerSQLCli {
new SCMPipelineManager(conf, nodeManager, eventQueue);
containerManager = new SCMContainerManager(conf, nodeManager,
pipelineManager, eventQueue);
blockManager = new BlockManagerImpl(
conf, nodeManager, pipelineManager, containerManager, eventQueue);
blockManager =
new BlockManagerImpl(conf, cluster.getStorageContainerManager());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* A tool to convert Ozone manager Metadata to SQL DB.
*/
package org.apache.hadoop.ozone.scm;