HDDS-981. Block allocation should involve pipeline selection and then container selection.

Contributed by Lokesh Jain.
This commit is contained in:
Nanda kumar 2019-02-08 15:43:58 +05:30
parent a140a890c6
commit 0c1bc4dcee
37 changed files with 794 additions and 304 deletions

View File

@ -312,9 +312,9 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY = public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl"; "ozone.scm.container.placement.impl";
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE = public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
"ozone.scm.container.provision_batch_size"; "ozone.scm.pipeline.owner.container.count";
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 20; public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
public static final String public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
@ -332,6 +332,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT = public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT =
"300s"; "300s";
public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL =
"ozone.scm.pipeline.creation.interval";
public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT =
"120s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry"; "ozone.scm.block.deletion.max.retry";
public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096; public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;

View File

@ -45,6 +45,11 @@ public final class Pipeline {
private PipelineState state; private PipelineState state;
private Map<DatanodeDetails, Long> nodeStatus; private Map<DatanodeDetails, Long> nodeStatus;
/**
* The immutable properties of pipeline object is used in
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
* the container allocations for a particular pipeline.
*/
private Pipeline(PipelineID id, ReplicationType type, private Pipeline(PipelineID id, ReplicationType type,
ReplicationFactor factor, PipelineState state, ReplicationFactor factor, PipelineState state,
Map<DatanodeDetails, Long> nodeStatus) { Map<DatanodeDetails, Long> nodeStatus) {

View File

@ -760,11 +760,10 @@
</description> </description>
</property> </property>
<property> <property>
<name>ozone.scm.container.provision_batch_size</name> <name>ozone.scm.pipeline.owner.container.count</name>
<value>20</value> <value>3</value>
<tag>OZONE, PERFORMANCE</tag> <tag>OZONE, SCM, PIPELINE</tag>
<description>Pre-provision specified number of containers for block <description>Number of containers per owner in a pipeline.
allocation.
</description> </description>
</property> </property>
<property> <property>
@ -1221,6 +1220,15 @@
before destroying a pipeline. before destroying a pipeline.
</description> </description>
</property> </property>
<property>
<name>ozone.scm.pipeline.creation.interval</name>
<value>120s</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
SCM schedules a fixed interval job using the configured interval to
create pipelines.
</description>
</property>
<property> <property>
<name>hdds.scm.chillmode.threshold.pct</name> <name>hdds.scm.chillmode.threshold.pct</name>

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -49,7 +48,6 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
@ -80,8 +78,6 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
private final DeletedBlockLog deletedBlockLog; private final DeletedBlockLog deletedBlockLog;
private final SCMBlockDeletingService blockDeletingService; private final SCMBlockDeletingService blockDeletingService;
private final int containerProvisionBatchSize;
private final Random rand;
private ObjectName mxBean; private ObjectName mxBean;
private ChillModePrecheck chillModePrecheck; private ChillModePrecheck chillModePrecheck;
@ -107,12 +103,6 @@ public BlockManagerImpl(final Configuration conf,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES); StorageUnit.BYTES);
this.containerProvisionBatchSize =
conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
rand = new Random();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service. // SCM block deleting transaction log and deleting service.
@ -151,32 +141,6 @@ public void stop() throws IOException {
this.close(); this.close();
} }
/**
* Pre allocate specified count of containers for block creation.
*
* @param count - Number of containers to allocate.
* @param type - Type of containers
* @param factor - how many copies needed for this container.
* @throws IOException
*/
private synchronized void preAllocateContainers(int count,
ReplicationType type, ReplicationFactor factor, String owner) {
for (int i = 0; i < count; i++) {
ContainerInfo containerInfo;
try {
// TODO: Fix this later when Ratis is made the Default.
containerInfo = containerManager.allocateContainer(
type, factor, owner);
if (containerInfo == null) {
LOG.warn("Unable to allocate container.");
}
} catch (IOException ex) {
LOG.warn("Unable to allocate container.", ex);
}
}
}
/** /**
* Allocates a block in a container and returns that info. * Allocates a block in a container and returns that info.
* *
@ -201,52 +165,44 @@ public AllocatedBlock allocateBlock(final long size,
/* /*
Here is the high level logic. Here is the high level logic.
1. We try to find containers in open state. 1. We try to find pipelines in open state.
2. If there are no containers in open state, then we will pre-allocate a 2. If there are no pipelines in OPEN state, then we try to create one.
bunch of containers in SCM and try again.
TODO : Support random picking of two containers from the list. So we can 3. We allocate a block from the available containers in the selected
use different kind of policies. pipeline.
TODO : #CLUTIL Support random picking of two containers from the list.
So we can use different kind of policies.
*/ */
ContainerInfo containerInfo; ContainerInfo containerInfo;
while (true) {
List<Pipeline> availablePipelines = pipelineManager
.getPipelines(type, factor, Pipeline.PipelineState.OPEN);
Pipeline pipeline;
if (availablePipelines.size() == 0) {
try {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
} catch (IOException e) {
break;
}
} else {
// TODO: #CLUTIL Make the selection policy driven.
pipeline = availablePipelines
.get((int) (Math.random() * availablePipelines.size()));
}
// look for OPEN containers that match the criteria. // look for OPEN containers that match the criteria.
containerInfo = containerManager containerInfo = containerManager
.getMatchingContainer(size, owner, type, factor, .getMatchingContainer(size, owner, pipeline);
HddsProtos.LifeCycleState.OPEN);
// We did not find OPEN Containers. This generally means
// that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and
// allocate some.
// Even though we have already checked the containers in OPEN
// state, we have to check again as we only hold a read lock.
// Some other thread might have pre-allocated container in meantime.
if (containerInfo == null) {
synchronized (this) {
if (!containerManager.getContainers(HddsProtos.LifeCycleState.OPEN)
.isEmpty()) {
containerInfo = containerManager
.getMatchingContainer(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
}
if (containerInfo == null) {
preAllocateContainers(containerProvisionBatchSize, type, factor,
owner);
containerInfo = containerManager
.getMatchingContainer(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
}
}
}
if (containerInfo != null) { if (containerInfo != null) {
return newBlock(containerInfo); return newBlock(containerInfo);
} }
}
// we have tried all strategies we know and but somehow we are not able // we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null. // to get a container for this block. Log that info and return a null.

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport; .NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventHandler;
@ -62,11 +63,13 @@ public class SCMChillModeManager implements
private static final String PIPELINE_EXIT_RULE = "PipelineChillModeRule"; private static final String PIPELINE_EXIT_RULE = "PipelineChillModeRule";
private final EventQueue eventPublisher; private final EventQueue eventPublisher;
private final PipelineManager pipelineManager;
public SCMChillModeManager(Configuration conf, public SCMChillModeManager(Configuration conf,
List<ContainerInfo> allContainers, PipelineManager pipelineManager, List<ContainerInfo> allContainers, PipelineManager pipelineManager,
EventQueue eventQueue) { EventQueue eventQueue) {
this.config = conf; this.config = conf;
this.pipelineManager = pipelineManager;
this.eventPublisher = eventQueue; this.eventPublisher = eventQueue;
this.isChillModeEnabled = conf.getBoolean( this.isChillModeEnabled = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
@ -128,6 +131,10 @@ public void exitChillMode(EventPublisher eventQueue) {
e.cleanup(); e.cleanup();
} }
emitChillModeStatus(); emitChillModeStatus();
// TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
// creation job needs to stop
RatisPipelineUtils
.scheduleFixedIntervalPipelineCreator(pipelineManager, config);
} }
@Override @Override

View File

@ -16,10 +16,8 @@
*/ */
package org.apache.hadoop.hdds.scm.container; package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -146,10 +144,12 @@ void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException; throws IOException;
/** /**
* Returns the ContainerInfo. * Returns ContainerInfo which matches the requirements.
* @return NodeManager * @param size - the amount of space required in the container
* @param owner - the user which requires space in its owned container
* @param pipeline - pipeline to which the container should belong
* @return ContainerInfo for the matching container.
*/ */
ContainerInfo getMatchingContainer(long size, ContainerInfo getMatchingContainer(long size, String owner,
String owner, ReplicationType type, ReplicationFactor factor, Pipeline pipeline);
LifeCycleState state) throws IOException;
} }

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
@ -120,6 +121,7 @@ public class ContainerStateManager {
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
private final ContainerStateMap containers; private final ContainerStateMap containers;
private final AtomicLong containerCount; private final AtomicLong containerCount;
private final int numContainerPerOwnerInPipeline;
/** /**
* Constructs a Container State Manager that tracks all containers owned by * Constructs a Container State Manager that tracks all containers owned by
@ -150,6 +152,9 @@ public ContainerStateManager(final Configuration configuration) {
this.lastUsedMap = new ConcurrentHashMap<>(); this.lastUsedMap = new ConcurrentHashMap<>();
this.containerCount = new AtomicLong(0); this.containerCount = new AtomicLong(0);
this.containers = new ContainerStateMap(); this.containers = new ContainerStateMap();
this.numContainerPerOwnerInPipeline = configuration
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
} }
/* /*
@ -246,6 +251,8 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager,
Pipeline pipeline; Pipeline pipeline;
try { try {
// TODO: #CLUTIL remove creation logic when all replication types and
// factors are handled by pipeline creator job.
pipeline = pipelineManager.createPipeline(type, replicationFactor); pipeline = pipelineManager.createPipeline(type, replicationFactor);
} catch (IOException e) { } catch (IOException e) {
final List<Pipeline> pipelines = pipelineManager final List<Pipeline> pipelines = pipelineManager
@ -257,10 +264,25 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager,
} }
pipeline = pipelines.get((int) containerCount.get() % pipelines.size()); pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
} }
return allocateContainer(pipelineManager, owner, pipeline);
}
Preconditions.checkNotNull(pipeline, "Pipeline type=%s/" /**
+ "replication=%s couldn't be found for the new container. " * Allocates a new container based on the type, replication etc.
+ "Do you have enough nodes?", type, replicationFactor); *
* @param pipelineManager - Pipeline Manager class.
* @param owner - Owner of the container.
* @param pipeline - Pipeline to which the container needs to be
* allocated.
* @return ContainerWithPipeline
* @throws IOException on Failure.
*/
ContainerInfo allocateContainer(
final PipelineManager pipelineManager, final String owner,
Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline,
"Pipeline couldn't be found for the new container. "
+ "Do you have enough nodes?");
final long containerID = containerCount.incrementAndGet(); final long containerID = containerCount.incrementAndGet();
final ContainerInfo containerInfo = new ContainerInfo.Builder() final ContainerInfo containerInfo = new ContainerInfo.Builder()
@ -272,7 +294,7 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager,
.setOwner(owner) .setOwner(owner)
.setContainerID(containerID) .setContainerID(containerID)
.setDeleteTransactionId(0) .setDeleteTransactionId(0)
.setReplicationFactor(replicationFactor) .setReplicationFactor(pipeline.getFactor())
.setReplicationType(pipeline.getType()) .setReplicationType(pipeline.getType())
.build(); .build();
pipelineManager.addContainerToPipeline(pipeline.getId(), pipelineManager.addContainerToPipeline(pipeline.getId(),
@ -345,37 +367,81 @@ void updateDeleteTransactionId(
* *
* @param size - Space needed in the Container. * @param size - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice. * @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis} * @param pipelineManager - Pipeline Manager
* @param factor - Replication Factor {ONE, THREE} * @param pipeline - Pipeline from which container needs to be matched
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found. * @return ContainerInfo, null if there is no match found.
*/ */
ContainerInfo getMatchingContainer(final long size, ContainerInfo getMatchingContainer(final long size, String owner,
String owner, ReplicationType type, ReplicationFactor factor, PipelineManager pipelineManager, Pipeline pipeline) throws IOException {
LifeCycleState state) {
// Find containers that match the query spec, if no match return null. NavigableSet<ContainerID> containerIDs =
final NavigableSet<ContainerID> matchingSet = pipelineManager.getContainersInPipeline(pipeline.getId());
containers.getMatchingContainerIDs(state, owner, factor, type); if (containerIDs == null) {
if (matchingSet == null || matchingSet.size() == 0) { LOG.error("Container list is null for pipeline=", pipeline.getId());
return null;
}
getContainers(containerIDs, owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
synchronized (pipeline) {
// TODO: #CLUTIL Maybe we can add selection logic inside synchronized
// as well
containerIDs = getContainers(
pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
ContainerInfo containerInfo =
allocateContainer(pipelineManager, owner, pipeline);
lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
containerInfo.containerID());
return containerInfo;
}
}
}
ContainerInfo containerInfo =
getMatchingContainer(size, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
synchronized (pipeline) {
containerInfo =
allocateContainer(pipelineManager, owner, pipeline);
lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
containerInfo.containerID());
}
}
// TODO: #CLUTIL cleanup entries in lastUsedMap
return containerInfo;
}
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param pipelineID - ID of the pipeline
* @param containerIDs - Set of containerIDs to choose from
* @return ContainerInfo, null if there is no match found.
*/
ContainerInfo getMatchingContainer(final long size, String owner,
PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) {
if (containerIDs.isEmpty()) {
return null; return null;
} }
// Get the last used container and find container above the last used // Get the last used container and find container above the last used
// container ID. // container ID.
final ContainerState key = new ContainerState(owner, type, factor); final ContainerState key = new ContainerState(owner, pipelineID);
final ContainerID lastID = lastUsedMap final ContainerID lastID =
.getOrDefault(key, matchingSet.first()); lastUsedMap.getOrDefault(key, containerIDs.first());
// There is a small issue here. The first time, we will skip the first // There is a small issue here. The first time, we will skip the first
// container. But in most cases it will not matter. // container. But in most cases it will not matter.
NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false); NavigableSet<ContainerID> resultSet = containerIDs.tailSet(lastID, false);
if (resultSet.size() == 0) { if (resultSet.size() == 0) {
resultSet = matchingSet; resultSet = containerIDs;
} }
ContainerInfo selectedContainer = ContainerInfo selectedContainer =
findContainerWithSpace(size, resultSet, owner); findContainerWithSpace(size, resultSet, owner, pipelineID);
if (selectedContainer == null) { if (selectedContainer == null) {
// If we did not find any space in the tailSet, we need to look for // If we did not find any space in the tailSet, we need to look for
@ -386,15 +452,17 @@ ContainerInfo getMatchingContainer(final long size,
// not true. Hence we need to include the last used container as the // not true. Hence we need to include the last used container as the
// last element in the sorted set. // last element in the sorted set.
resultSet = matchingSet.headSet(lastID, true); resultSet = containerIDs.headSet(lastID, true);
selectedContainer = findContainerWithSpace(size, resultSet, owner); selectedContainer =
findContainerWithSpace(size, resultSet, owner, pipelineID);
} }
return selectedContainer;
return selectedContainer;
} }
private ContainerInfo findContainerWithSpace(final long size, private ContainerInfo findContainerWithSpace(final long size,
final NavigableSet<ContainerID> searchSet, final String owner) { final NavigableSet<ContainerID> searchSet, final String owner,
final PipelineID pipelineID) {
try { try {
// Get the container with space to meet our request. // Get the container with space to meet our request.
for (ContainerID id : searchSet) { for (ContainerID id : searchSet) {
@ -402,9 +470,7 @@ private ContainerInfo findContainerWithSpace(final long size,
if (containerInfo.getUsedBytes() + size <= this.containerSize) { if (containerInfo.getUsedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime(); containerInfo.updateLastUsedTime();
final ContainerState key = new ContainerState(owner, final ContainerState key = new ContainerState(owner, pipelineID);
containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
lastUsedMap.put(key, containerInfo.containerID()); lastUsedMap.put(key, containerInfo.containerID());
return containerInfo; return containerInfo;
} }
@ -457,6 +523,22 @@ ContainerInfo getContainer(final ContainerID containerID)
return containers.getContainerInfo(containerID); return containers.getContainerInfo(containerID);
} }
private NavigableSet<ContainerID> getContainers(
NavigableSet<ContainerID> containerIDs, String owner) {
for (ContainerID cid : containerIDs) {
try {
if (!getContainer(cid).getOwner().equals(owner)) {
containerIDs.remove(cid);
}
} catch (ContainerNotFoundException e) {
LOG.error("Could not find container info for container id={} {}", cid,
e);
containerIDs.remove(cid);
}
}
return containerIDs;
}
void close() throws IOException { void close() throws IOException {
} }

View File

@ -20,11 +20,11 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@ -50,14 +50,8 @@
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
/** /**
@ -73,8 +67,6 @@ public class SCMContainerManager implements ContainerManager {
private final MetadataStore containerStore; private final MetadataStore containerStore;
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
private final ContainerStateManager containerStateManager; private final ContainerStateManager containerStateManager;
private final EventPublisher eventPublisher;
private final long size;
/** /**
* Constructs a mapping class that creates mapping between container names * Constructs a mapping class that creates mapping between container names
@ -106,11 +98,8 @@ public SCMContainerManager(final Configuration conf,
.build(); .build();
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.size = (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.pipelineManager = pipelineManager; this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf); this.containerStateManager = new ContainerStateManager(conf);
this.eventPublisher = eventPublisher;
loadExistingContainers(); loadExistingContainers();
} }
@ -291,11 +280,14 @@ public HddsProtos.LifeCycleState updateContainerState(
// Should we return the updated ContainerInfo instead of LifeCycleState? // Should we return the updated ContainerInfo instead of LifeCycleState?
lock.lock(); lock.lock();
try { try {
ContainerInfo container = containerStateManager.getContainer(containerID);
ContainerInfo updatedContainer = ContainerInfo updatedContainer =
updateContainerStateInternal(containerID, event); updateContainerStateInternal(containerID, event);
if (!updatedContainer.isOpen()) { if (updatedContainer.getState() != LifeCycleState.OPEN
pipelineManager.removeContainerFromPipeline( && container.getState() == LifeCycleState.OPEN) {
updatedContainer.getPipelineID(), containerID); pipelineManager
.removeContainerFromPipeline(updatedContainer.getPipelineID(),
containerID);
} }
final byte[] dbKey = Longs.toByteArray(containerID.getId()); final byte[] dbKey = Longs.toByteArray(containerID.getId());
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
@ -361,16 +353,21 @@ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
* *
* @param sizeRequired - Space needed in the Container. * @param sizeRequired - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice. * @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis} * @param pipeline - Pipeline to which the container should belong.
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found. * @return ContainerInfo, null if there is no match found.
*/ */
public ContainerInfo getMatchingContainer( public ContainerInfo getMatchingContainer(final long sizeRequired,
final long sizeRequired, String owner, ReplicationType type, String owner, Pipeline pipeline) {
ReplicationFactor factor, LifeCycleState state) throws IOException { try {
return containerStateManager.getMatchingContainer( //TODO: #CLUTIL See if lock is required here
sizeRequired, owner, type, factor, state); return containerStateManager
.getMatchingContainer(sizeRequired, owner, pipelineManager,
pipeline);
} catch (Exception e) {
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
pipeline, sizeRequired, e);
return null;
}
} }
/** /**

View File

@ -20,43 +20,30 @@
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
/** /**
* Class that acts as the container state. * Class that acts as the container state.
*/ */
public class ContainerState { public class ContainerState {
private final HddsProtos.ReplicationType type;
private final String owner; private final String owner;
private final HddsProtos.ReplicationFactor replicationFactor; private final PipelineID pipelineID;
/** /**
* Constructs a Container Key. * Constructs a Container Key.
* *
* @param owner - Container Owners * @param owner - Container Owners
* @param type - Replication Type. * @param pipelineID - ID of the pipeline
* @param factor - Replication Factors
*/ */
public ContainerState(String owner, HddsProtos.ReplicationType type, public ContainerState(String owner, PipelineID pipelineID) {
HddsProtos.ReplicationFactor factor) { this.pipelineID = pipelineID;
this.type = type;
this.owner = owner; this.owner = owner;
this.replicationFactor = factor;
}
public HddsProtos.ReplicationType getType() {
return type;
} }
public String getOwner() { public String getOwner() {
return owner; return owner;
} }
public HddsProtos.ReplicationFactor getFactor() {
return replicationFactor;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
@ -70,27 +57,24 @@ public boolean equals(Object o) {
ContainerState that = (ContainerState) o; ContainerState that = (ContainerState) o;
return new EqualsBuilder() return new EqualsBuilder()
.append(type, that.type)
.append(owner, that.owner) .append(owner, that.owner)
.append(replicationFactor, that.replicationFactor) .append(pipelineID, that.pipelineID)
.isEquals(); .isEquals();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder(137, 757) return new HashCodeBuilder(137, 757)
.append(type)
.append(owner) .append(owner)
.append(replicationFactor) .append(pipelineID)
.toHashCode(); .toHashCode();
} }
@Override @Override
public String toString() { public String toString() {
return "ContainerKey{" + return "ContainerKey{" +
", type=" + type +
", owner=" + owner + ", owner=" + owner +
", replicationFactor=" + replicationFactor + ", pipelineID=" + pipelineID +
'}'; '}';
} }
} }

View File

@ -168,6 +168,13 @@ public final class SCMEvents {
public static final TypedEvent<DatanodeDetails> DEAD_NODE = public static final TypedEvent<DatanodeDetails> DEAD_NODE =
new TypedEvent<>(DatanodeDetails.class, "Dead_Node"); new TypedEvent<>(DatanodeDetails.class, "Dead_Node");
/**
* This event will be triggered whenever a datanode is moved from non-healthy
* state to healthy state.
*/
public static final TypedEvent<DatanodeDetails> NON_HEALTHY_TO_HEALTHY_NODE =
new TypedEvent<>(DatanodeDetails.class, "NON_HEALTHY_TO_HEALTHY_NODE");
/** /**
* This event will be triggered by CommandStatusReportHandler whenever a * This event will be triggered by CommandStatusReportHandler whenever a
* status for Replication SCMCommand is received. * status for Replication SCMCommand is received.

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.hdds.scm.node; package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -27,9 +30,18 @@
*/ */
public class NewNodeHandler implements EventHandler<DatanodeDetails> { public class NewNodeHandler implements EventHandler<DatanodeDetails> {
private final PipelineManager pipelineManager;
private final Configuration conf;
public NewNodeHandler(PipelineManager pipelineManager, Configuration conf) {
this.pipelineManager = pipelineManager;
this.conf = conf;
}
@Override @Override
public void onMessage(DatanodeDetails datanodeDetails, public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) { EventPublisher publisher) {
// We currently have nothing to do when we receive new node event. RatisPipelineUtils
.triggerPipelineCreation(pipelineManager, conf, 0);
} }
} }

View File

@ -156,6 +156,8 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
private void initialiseState2EventMap() { private void initialiseState2EventMap() {
state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE); state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE); state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
state2EventMap
.put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
} }
/* /*

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
/**
* Handles Stale node event.
*/
public class NonHealthyToHealthyNodeHandler
implements EventHandler<DatanodeDetails> {
private final PipelineManager pipelineManager;
private final Configuration conf;
public NonHealthyToHealthyNodeHandler(
PipelineManager pipelineManager, OzoneConfiguration conf) {
this.pipelineManager = pipelineManager;
this.conf = conf;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
RatisPipelineUtils.triggerPipelineCreation(pipelineManager, conf, 0);
}
}

View File

@ -26,7 +26,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.NavigableSet;
/** /**
* Interface which exposes the api for pipeline management. * Interface which exposes the api for pipeline management.
@ -57,7 +57,7 @@ void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
void removeContainerFromPipeline(PipelineID pipelineID, void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException; ContainerID containerID) throws IOException;
Set<ContainerID> getContainersInPipeline(PipelineID pipelineID) NavigableSet<ContainerID> getContainersInPipeline(PipelineID pipelineID)
throws IOException; throws IOException;
int getNumberOfContainers(PipelineID pipelineID) throws IOException; int getNumberOfContainers(PipelineID pipelineID) throws IOException;

View File

@ -26,7 +26,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.NavigableSet;
/** /**
* Manages the state of pipelines in SCM. All write operations like pipeline * Manages the state of pipelines in SCM. All write operations like pipeline
@ -77,7 +77,8 @@ List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
return pipelineStateMap.getPipelines(type, states); return pipelineStateMap.getPipelines(type, states);
} }
Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException { NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
return pipelineStateMap.getContainers(pipelineID); return pipelineStateMap.getContainers(pipelineID);
} }

View File

@ -41,7 +41,7 @@ class PipelineStateMap {
PipelineStateMap.class); PipelineStateMap.class);
private final Map<PipelineID, Pipeline> pipelineMap; private final Map<PipelineID, Pipeline> pipelineMap;
private final Map<PipelineID, Set<ContainerID>> pipeline2container; private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container;
PipelineStateMap() { PipelineStateMap() {
@ -202,17 +202,17 @@ List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
* @return Set of containerIDs belonging to the pipeline * @return Set of containerIDs belonging to the pipeline
* @throws IOException if pipeline is not found * @throws IOException if pipeline is not found
*/ */
Set<ContainerID> getContainers(PipelineID pipelineID) NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws PipelineNotFoundException { throws PipelineNotFoundException {
Preconditions.checkNotNull(pipelineID, Preconditions.checkNotNull(pipelineID,
"Pipeline Id cannot be null"); "Pipeline Id cannot be null");
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID); NavigableSet<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs == null) { if (containerIDs == null) {
throw new PipelineNotFoundException( throw new PipelineNotFoundException(
String.format("%s not found", pipelineID)); String.format("%s not found", pipelineID));
} }
return new HashSet<>(containerIDs); return new TreeSet<>(containerIDs);
} }
/** /**

View File

@ -19,10 +19,12 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.ratis.RatisHelper; import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcTlsConfig;
@ -42,6 +44,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Utility class for Ratis pipelines. Contains methods to create and destroy * Utility class for Ratis pipelines. Contains methods to create and destroy
@ -51,6 +54,8 @@ public final class RatisPipelineUtils {
private static TimeoutScheduler timeoutScheduler = private static TimeoutScheduler timeoutScheduler =
TimeoutScheduler.newInstance(1); TimeoutScheduler.newInstance(1);
private static AtomicBoolean isPipelineCreatorRunning =
new AtomicBoolean(false);
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineUtils.class); LoggerFactory.getLogger(RatisPipelineUtils.class);
@ -60,6 +65,7 @@ private RatisPipelineUtils() {
/** /**
* Sends ratis command to create pipeline on all the datanodes. * Sends ratis command to create pipeline on all the datanodes.
*
* @param pipeline - Pipeline to be created * @param pipeline - Pipeline to be created
* @param ozoneConf - Ozone Confinuration * @param ozoneConf - Ozone Confinuration
* @throws IOException if creation fails * @throws IOException if creation fails
@ -75,6 +81,7 @@ public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
/** /**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes. * the datanodes.
*
* @param pipelineManager - SCM pipeline manager * @param pipelineManager - SCM pipeline manager
* @param pipeline - Pipeline to be destroyed * @param pipeline - Pipeline to be destroyed
* @param ozoneConf - Ozone configuration * @param ozoneConf - Ozone configuration
@ -84,22 +91,29 @@ public static void destroyPipeline(PipelineManager pipelineManager,
Pipeline pipeline, Configuration ozoneConf) throws IOException { Pipeline pipeline, Configuration ozoneConf) throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline); final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
for (DatanodeDetails dn : pipeline.getNodes()) {
try {
destroyPipeline(dn, pipeline.getId(), ozoneConf);
} catch (IOException e) {
LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
pipeline.getId(), dn);
}
}
// remove the pipeline from the pipeline manager // remove the pipeline from the pipeline manager
pipelineManager.removePipeline(pipeline.getId()); pipelineManager.removePipeline(pipeline.getId());
for (DatanodeDetails dn : pipeline.getNodes()) { triggerPipelineCreation(pipelineManager, ozoneConf, 0);
destroyPipeline(dn, pipeline.getId(), ozoneConf);
}
} }
/** /**
* Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to
* destroy pipeline on the datanodes immediately or after timeout based on the * destroy pipeline on the datanodes immediately or after timeout based on the
* value of onTimeout parameter. * value of onTimeout parameter.
*
* @param pipelineManager - SCM pipeline manager * @param pipelineManager - SCM pipeline manager
* @param pipeline - Pipeline to be destroyed * @param pipeline - Pipeline to be destroyed
* @param ozoneConf - Ozone Configuration * @param ozoneConf - Ozone Configuration
* @param onTimeout - if true pipeline is removed and destroyed on datanodes * @param onTimeout - if true pipeline is removed and destroyed on
* after timeout * datanodes after timeout
* @throws IOException * @throws IOException
*/ */
public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager, public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager,
@ -126,6 +140,7 @@ public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager,
/** /**
* Sends ratis command to destroy pipeline on the given datanode. * Sends ratis command to destroy pipeline on the given datanode.
*
* @param dn - Datanode on which pipeline needs to be destroyed * @param dn - Datanode on which pipeline needs to be destroyed
* @param pipelineID - ID of pipeline to be destroyed * @param pipelineID - ID of pipeline to be destroyed
* @param ozoneConf - Ozone configuration * @param ozoneConf - Ozone configuration
@ -184,4 +199,79 @@ private static void callRatisRpc(List<DatanodeDetails> datanodes,
throw MultipleIOException.createIOException(exceptions); throw MultipleIOException.createIOException(exceptions);
} }
} }
/**
* Schedules a fixed interval job to create pipelines.
*
* @param pipelineManager - Pipeline manager
* @param conf - Configuration
*/
public static void scheduleFixedIntervalPipelineCreator(
PipelineManager pipelineManager, Configuration conf) {
long intervalInMillis = conf
.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
// TODO: #CLUTIL We can start the job asap
TimeDuration timeDuration =
TimeDuration.valueOf(intervalInMillis, TimeUnit.MILLISECONDS);
timeoutScheduler.onTimeout(timeDuration,
() -> fixedIntervalPipelineCreator(pipelineManager, conf,
timeDuration), LOG,
() -> "FixedIntervalPipelineCreatorJob failed.");
}
private static void fixedIntervalPipelineCreator(
PipelineManager pipelineManager, Configuration conf,
TimeDuration timeDuration) {
timeoutScheduler.onTimeout(timeDuration,
() -> fixedIntervalPipelineCreator(pipelineManager, conf,
timeDuration), LOG,
() -> "FixedIntervalPipelineCreatorJob failed.");
triggerPipelineCreation(pipelineManager, conf, 0);
}
/**
* Triggers pipeline creation after the specified time.
*
* @param pipelineManager - Pipeline manager
* @param conf - Configuration
* @param afterMillis - Time after which pipeline creation needs to be
* triggered
*/
public static void triggerPipelineCreation(PipelineManager pipelineManager,
Configuration conf, long afterMillis) {
// TODO: #CLUTIL introduce a better mechanism to not have more than one
// job of a particular type running, probably via ratis.
if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
return;
}
timeoutScheduler
.onTimeout(TimeDuration.valueOf(afterMillis, TimeUnit.MILLISECONDS),
() -> createPipelines(pipelineManager, conf), LOG,
() -> "PipelineCreation failed.");
}
private static void createPipelines(PipelineManager pipelineManager,
Configuration conf) {
// TODO: #CLUTIL Different replication factor may need to be supported
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
while (true) {
try {
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
} catch (Throwable t) {
LOG.error("Error while creating pipelines {}", t);
break;
}
}
}
isPipelineCreatorRunning.set(false);
}
} }

View File

@ -40,6 +40,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -213,8 +214,8 @@ public void removeContainerFromPipeline(PipelineID pipelineID,
} }
@Override @Override
public Set<ContainerID> getContainersInPipeline(PipelineID pipelineID) public NavigableSet<ContainerID> getContainersInPipeline(
throws IOException { PipelineID pipelineID) throws IOException {
lock.readLock().lock(); lock.readLock().lock();
try { try {
return stateManager.getContainers(pipelineID); return stateManager.getContainers(pipelineID);

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NewNodeHandler; import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@ -256,11 +257,13 @@ private StorageContainerManager(OzoneConfiguration conf)
CommandStatusReportHandler cmdStatusReportHandler = CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler(); new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(); NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, conf);
StaleNodeHandler staleNodeHandler = StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager, conf); new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
containerManager); containerManager);
NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
PendingDeleteHandler pendingDeleteHandler = PendingDeleteHandler pendingDeleteHandler =
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
@ -320,6 +323,8 @@ private StorageContainerManager(OzoneConfiguration conf)
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
nonHealthyToHealthyNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION, eventQueue.addHandler(SCMEvents.START_REPLICATION,

View File

@ -17,14 +17,24 @@
*/ */
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos; .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport; .NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
/** /**
* Stateless helper functions for Hdds tests. * Stateless helper functions for Hdds tests.
@ -65,6 +75,21 @@ private HddsTestUtils() {
TestUtils.getContainerReports(containers)); TestUtils.getContainerReports(containers));
} }
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
scmStore.initialize();
}
return StorageContainerManager.createSCM(null, conf);
}
/** /**
* Creates list of ContainerInfo. * Creates list of ContainerInfo.
* *

View File

@ -23,12 +23,16 @@
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager; import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMStorage; import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@ -49,6 +53,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.GB; import static org.apache.hadoop.ozone.OzoneConsts.GB;
@ -59,27 +64,33 @@
* Tests for SCM Block Manager. * Tests for SCM Block Manager.
*/ */
public class TestBlockManager implements EventHandler<Boolean> { public class TestBlockManager implements EventHandler<Boolean> {
private static SCMContainerManager mapping; private SCMContainerManager mapping;
private static MockNodeManager nodeManager; private MockNodeManager nodeManager;
private static PipelineManager pipelineManager; private PipelineManager pipelineManager;
private static BlockManagerImpl blockManager; private BlockManagerImpl blockManager;
private static File testDir; private File testDir;
private final static long DEFAULT_BLOCK_SIZE = 128 * MB; private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
private static HddsProtos.ReplicationFactor factor; private static HddsProtos.ReplicationFactor factor;
private static HddsProtos.ReplicationType type; private static HddsProtos.ReplicationType type;
private static String containerOwner = "OZONE"; private static String containerOwner = "OZONE";
private static EventQueue eventQueue; private static EventQueue eventQueue;
private int numContainerPerOwnerInPipeline;
private Configuration conf;
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = SCMTestUtils.getConf(); conf = SCMTestUtils.getConf();
numContainerPerOwnerInPipeline = conf.getInt(
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
String path = GenericTestUtils String path = GenericTestUtils
.getTempPath(TestBlockManager.class.getSimpleName()); .getTempPath(TestBlockManager.class.getSimpleName());
testDir = Paths.get(path).toFile(); testDir = Paths.get(path).toFile();
testDir.delete();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, path); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, path);
eventQueue = new EventQueue(); eventQueue = new EventQueue();
boolean folderExisted = testDir.exists() || testDir.mkdirs(); boolean folderExisted = testDir.exists() || testDir.mkdirs();
@ -95,6 +106,9 @@ public void setUp() throws Exception {
nodeManager, pipelineManager, mapping, eventQueue); nodeManager, pipelineManager, mapping, eventQueue);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
eventQueue.addHandler(SCMEvents.START_REPLICATION, this); eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){ ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){
factor = HddsProtos.ReplicationFactor.THREE; factor = HddsProtos.ReplicationFactor.THREE;
@ -176,6 +190,113 @@ public void testAllocateBlockSucInChillMode() throws Exception {
type, factor, containerOwner)); type, factor, containerOwner));
} }
@Test(timeout = 10000)
public void testMultipleBlockAllocation()
throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
pipelineManager.createPipeline(type, factor);
pipelineManager.createPipeline(type, factor);
AllocatedBlock allocatedBlock = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
// block should be allocated in different pipelines
GenericTestUtils.waitFor(() -> {
try {
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
return !block.getPipeline().getId()
.equals(allocatedBlock.getPipeline().getId());
} catch (IOException e) {
}
return false;
}, 100, 1000);
}
private boolean verifyNumberOfContainersInPipelines(
int numContainersPerPipeline) {
try {
for (Pipeline pipeline : pipelineManager.getPipelines(type, factor)) {
if (pipelineManager.getNumberOfContainers(pipeline.getId())
!= numContainersPerPipeline) {
return false;
}
}
} catch (IOException e) {
return false;
}
return true;
}
@Test(timeout = 10000)
public void testMultipleBlockAllocationWithClosedContainer()
throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
// create pipelines
for (int i = 0;
i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size(); i++) {
pipelineManager.createPipeline(type, factor);
}
// wait till each pipeline has the configured number of containers.
// After this each pipeline has numContainerPerOwnerInPipeline containers
// for each owner
GenericTestUtils.waitFor(() -> {
try {
blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
} catch (IOException e) {
}
return verifyNumberOfContainersInPipelines(
numContainerPerOwnerInPipeline);
}, 10, 1000);
// close all the containers in all the pipelines
for (Pipeline pipeline : pipelineManager.getPipelines(type, factor)) {
for (ContainerID cid : pipelineManager
.getContainersInPipeline(pipeline.getId())) {
eventQueue.fireEvent(SCMEvents.CLOSE_CONTAINER, cid);
}
}
// wait till no containers are left in the pipelines
GenericTestUtils
.waitFor(() -> verifyNumberOfContainersInPipelines(0), 10, 5000);
// allocate block so that each pipeline has the configured number of
// containers.
GenericTestUtils.waitFor(() -> {
try {
blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
} catch (IOException e) {
}
return verifyNumberOfContainersInPipelines(
numContainerPerOwnerInPipeline);
}, 10, 1000);
}
@Test(timeout = 10000)
public void testBlockAllocationWithNoAvailablePipelines()
throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
for (Pipeline pipeline : pipelineManager.getPipelines()) {
RatisPipelineUtils
.finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
}
Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
Assert.assertNotNull(blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner));
Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
}
@Override @Override
public void onMessage(Boolean aBoolean, EventPublisher publisher) { public void onMessage(Boolean aBoolean, EventPublisher publisher) {
System.out.println("test"); System.out.println("test");

View File

@ -35,24 +35,24 @@
.StorageContainerDatanodeProtocolProtos.NodeReportProto; .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto; .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode; .NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -66,6 +66,7 @@
*/ */
public class TestDeadNodeHandler { public class TestDeadNodeHandler {
private StorageContainerManager scm;
private SCMNodeManager nodeManager; private SCMNodeManager nodeManager;
private ContainerManager containerManager; private ContainerManager containerManager;
private NodeReportHandler nodeReportHandler; private NodeReportHandler nodeReportHandler;
@ -75,17 +76,15 @@ public class TestDeadNodeHandler {
private String storageDir; private String storageDir;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException, AuthenticationException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
storageDir = GenericTestUtils.getTempPath( storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
eventQueue = new EventQueue(); eventQueue = new EventQueue();
nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue); scm = HddsTestUtils.getScm(conf);
PipelineManager pipelineManager = nodeManager = (SCMNodeManager) scm.getScmNodeManager();
new SCMPipelineManager(conf, nodeManager, eventQueue); containerManager = scm.getContainerManager();
containerManager = new SCMContainerManager(conf, nodeManager,
pipelineManager, eventQueue);
deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager); deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
publisher = Mockito.mock(EventPublisher.class); publisher = Mockito.mock(EventPublisher.class);
@ -94,6 +93,8 @@ public void setup() throws IOException {
@After @After
public void teardown() { public void teardown() {
scm.stop();
scm.join();
FileUtil.fullyDelete(new File(storageDir)); FileUtil.fullyDelete(new File(storageDir));
} }

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@ -28,11 +29,12 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto; .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.junit.After; import org.junit.After;
@ -43,7 +45,6 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -77,6 +78,7 @@
public class TestSCMNodeManager { public class TestSCMNodeManager {
private File testDir; private File testDir;
private StorageContainerManager scm;
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
@ -93,6 +95,10 @@ public void setup() {
@After @After
public void cleanup() { public void cleanup() {
if (scm != null) {
scm.stop();
scm.join();
}
FileUtil.fullyDelete(testDir); FileUtil.fullyDelete(testDir);
} }
@ -119,17 +125,9 @@ OzoneConfiguration getConf() {
*/ */
SCMNodeManager createNodeManager(OzoneConfiguration config) SCMNodeManager createNodeManager(OzoneConfiguration config)
throws IOException { throws IOException, AuthenticationException {
EventQueue eventQueue = new EventQueue(); scm = HddsTestUtils.getScm(config);
eventQueue.addHandler(SCMEvents.NEW_NODE, return (SCMNodeManager) scm.getScmNodeManager();
Mockito.mock(NewNodeHandler.class));
eventQueue.addHandler(SCMEvents.STALE_NODE,
Mockito.mock(StaleNodeHandler.class));
eventQueue.addHandler(SCMEvents.DEAD_NODE,
Mockito.mock(DeadNodeHandler.class));
SCMNodeManager nodeManager = new SCMNodeManager(config,
UUID.randomUUID().toString(), null, eventQueue);
return nodeManager;
} }
/** /**
@ -141,8 +139,8 @@ SCMNodeManager createNodeManager(OzoneConfiguration config)
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmHeartbeat() throws IOException, public void testScmHeartbeat()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
int registeredNodes = 5; int registeredNodes = 5;
@ -169,8 +167,8 @@ public void testScmHeartbeat() throws IOException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmNoHeartbeats() throws IOException, public void testScmNoHeartbeats()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
//TODO: wait for heartbeat to be processed //TODO: wait for heartbeat to be processed
@ -190,8 +188,8 @@ public void testScmNoHeartbeats() throws IOException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmShutdown() throws IOException, InterruptedException, public void testScmShutdown()
TimeoutException { throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS); 100, TimeUnit.MILLISECONDS);
@ -218,8 +216,8 @@ public void testScmShutdown() throws IOException, InterruptedException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmHealthyNodeCount() throws IOException, public void testScmHealthyNodeCount()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
final int count = 10; final int count = 10;
@ -247,8 +245,8 @@ public void testScmHealthyNodeCount() throws IOException,
*/ */
@Test @Test
public void testScmSanityOfUserConfig1() throws IOException, public void testScmSanityOfUserConfig1()
InterruptedException, TimeoutException { throws IOException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
final int interval = 100; final int interval = 100;
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
@ -276,8 +274,8 @@ public void testScmSanityOfUserConfig1() throws IOException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmSanityOfUserConfig2() throws IOException, public void testScmSanityOfUserConfig2()
InterruptedException, TimeoutException { throws IOException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
final int interval = 100; final int interval = 100;
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
@ -299,8 +297,8 @@ public void testScmSanityOfUserConfig2() throws IOException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmDetectStaleAndDeadNode() throws IOException, public void testScmDetectStaleAndDeadNode()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
final int interval = 100; final int interval = 100;
final int nodeCount = 10; final int nodeCount = 10;
@ -379,7 +377,8 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
* @throws IOException * @throws IOException
*/ */
@Test @Test
public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException { public void testScmCheckForErrorOnNullDatanodeDetails()
throws IOException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.processHeartbeat(null); nodeManager.processHeartbeat(null);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
@ -438,8 +437,8 @@ public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException {
*/ */
@Test @Test
public void testScmClusterIsInExpectedState1() throws IOException, public void testScmClusterIsInExpectedState1()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
MILLISECONDS); MILLISECONDS);
@ -613,8 +612,9 @@ private boolean findNodes(NodeManager nodeManager, int count,
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test @Test
public void testScmClusterIsInExpectedState2() throws IOException, public void testScmClusterIsInExpectedState2()
InterruptedException, TimeoutException { throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
final int healthyCount = 5000; final int healthyCount = 5000;
final int staleCount = 100; final int staleCount = 100;
final int deadCount = 10; final int deadCount = 10;
@ -706,8 +706,9 @@ public void testScmClusterIsInExpectedState2() throws IOException,
* @throws TimeoutException * @throws TimeoutException
*/ */
@Test @Test
public void testScmCanHandleScale() throws IOException, public void testScmCanHandleScale()
InterruptedException, TimeoutException { throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
final int healthyCount = 3000; final int healthyCount = 3000;
final int staleCount = 3000; final int staleCount = 3000;
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
@ -770,8 +771,8 @@ public void testScmCanHandleScale() throws IOException,
@Test @Test
@Ignore @Ignore
// TODO: Enable this after we implement NodeReportEvent handler. // TODO: Enable this after we implement NodeReportEvent handler.
public void testScmStatsFromNodeReport() throws IOException, public void testScmStatsFromNodeReport()
InterruptedException, TimeoutException { throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS); MILLISECONDS);
@ -813,8 +814,9 @@ public void testScmStatsFromNodeReport() throws IOException,
@Test @Test
@Ignore @Ignore
// TODO: Enable this after we implement NodeReportEvent handler. // TODO: Enable this after we implement NodeReportEvent handler.
public void testScmNodeReportUpdate() throws IOException, public void testScmNodeReportUpdate()
InterruptedException, TimeoutException { throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
final int heartbeatCount = 5; final int heartbeatCount = 5;
final int nodeCount = 1; final int nodeCount = 1;
@ -939,7 +941,8 @@ public void testScmNodeReportUpdate() throws IOException,
} }
@Test @Test
public void testHandlingSCMCommandEvent() throws IOException { public void testHandlingSCMCommandEvent()
throws IOException, AuthenticationException {
OzoneConfiguration conf = getConf(); OzoneConfiguration conf = getConf();
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS); 100, TimeUnit.MILLISECONDS);

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.hadoop.hdds.scm.container; package org.apache.hadoop.hdds.scm.container;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
@ -23,6 +26,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
@ -34,10 +38,12 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.concurrent.CompletableFuture;
import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
@ -45,6 +51,9 @@
*/ */
public class TestContainerStateManagerIntegration { public class TestContainerStateManagerIntegration {
private static final Logger LOG =
LoggerFactory.getLogger(TestContainerStateManagerIntegration.class);
private OzoneConfiguration conf; private OzoneConfiguration conf;
private MiniOzoneCluster cluster; private MiniOzoneCluster cluster;
private XceiverClientManager xceiverClientManager; private XceiverClientManager xceiverClientManager;
@ -52,11 +61,15 @@ public class TestContainerStateManagerIntegration {
private ContainerManager containerManager; private ContainerManager containerManager;
private ContainerStateManager containerStateManager; private ContainerStateManager containerStateManager;
private String containerOwner = "OZONE"; private String containerOwner = "OZONE";
private int numContainerPerOwnerInPipeline;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
numContainerPerOwnerInPipeline =
conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
cluster.waitTobeOutOfChillMode(); cluster.waitTobeOutOfChillMode();
@ -80,11 +93,10 @@ public void testAllocateContainer() throws IOException {
ContainerWithPipeline container1 = scm.getClientProtocolServer() ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(xceiverClientManager.getType(), .allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner); xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager ContainerInfo info = containerManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner, .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(), container1.getPipeline());
HddsProtos.LifeCycleState.OPEN); Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID()); info.getContainerID());
Assert.assertEquals(containerOwner, info.getOwner()); Assert.assertEquals(containerOwner, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(), Assert.assertEquals(xceiverClientManager.getType(),
@ -104,7 +116,7 @@ public void testAllocateContainer() throws IOException {
HddsProtos.LifeCycleState.OPEN).size(); HddsProtos.LifeCycleState.OPEN).size();
Assert.assertNotEquals(container1.getContainerInfo().getContainerID(), Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
container2.getContainerInfo().getContainerID()); container2.getContainerInfo().getContainerID());
Assert.assertEquals(2, numContainers); Assert.assertEquals(3, numContainers);
} }
@Test @Test
@ -156,36 +168,71 @@ public void testContainerStateManagerRestart() throws IOException,
@Test @Test
public void testGetMatchingContainer() throws IOException { public void testGetMatchingContainer() throws IOException {
long cid;
ContainerWithPipeline container1 = scm.getClientProtocolServer(). ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(), allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner); xceiverClientManager.getFactor(), containerOwner);
cid = container1.getContainerInfo().getContainerID();
ContainerInfo info = containerStateManager // each getMatchingContainer call allocates a container in the
// pipeline till the pipeline has numContainerPerOwnerInPipeline number of
// containers.
for (int i = 1; i < numContainerPerOwnerInPipeline; i++) {
ContainerInfo info = containerManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner, .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(), container1.getPipeline());
HddsProtos.LifeCycleState.OPEN); Assert.assertTrue(info.getContainerID() > cid);
cid = info.getContainerID();
}
// At this point there are already three containers in the pipeline.
// next container should be the same as first container
ContainerInfo info = containerManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
container1.getPipeline());
Assert.assertEquals(container1.getContainerInfo().getContainerID(), Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID()); info.getContainerID());
}
ContainerWithPipeline container2 = scm.getClientProtocolServer(). @Test
public void testGetMatchingContainerMultipleThreads()
throws IOException, InterruptedException {
ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(), allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner); xceiverClientManager.getFactor(), containerOwner);
info = containerStateManager Map<Long, Long> container2MatchedCount = new ConcurrentHashMap<>();
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
// space has already been allocated in container1, now container 2 should
// be chosen.
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
// now we have to get container1 // allocate blocks using multiple threads
info = containerStateManager int numBlockAllocates = 100000;
for (int i = 0; i < numBlockAllocates; i++) {
CompletableFuture.supplyAsync(() -> {
ContainerInfo info = containerManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner, .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(), container1.getPipeline());
HddsProtos.LifeCycleState.OPEN); container2MatchedCount
Assert.assertEquals(container1.getContainerInfo().getContainerID(), .compute(info.getContainerID(), (k, v) -> v == null ? 1L : v + 1);
info.getContainerID()); return null;
});
}
// make sure pipeline has has numContainerPerOwnerInPipeline number of
// containers.
Assert.assertEquals(scm.getPipelineManager()
.getNumberOfContainers(container1.getPipeline().getId()),
numContainerPerOwnerInPipeline);
Thread.sleep(5000);
long threshold = 2000;
// check the way the block allocations are distributed in the different
// containers.
for (Long matchedCount : container2MatchedCount.values()) {
// TODO: #CLUTIL Look at the division of block allocations in different
// containers.
LOG.error("Total allocated block = " + matchedCount);
Assert.assertTrue(matchedCount <=
numBlockAllocates / container2MatchedCount.size() + threshold
&& matchedCount >=
numBlockAllocates / container2MatchedCount.size() - threshold);
}
} }
@Test @Test

View File

@ -100,9 +100,7 @@ public void testPipelineMap() throws IOException {
// get pipeline details by dnid // get pipeline details by dnid
Set<PipelineID> pipelines = scm.getScmNodeManager() Set<PipelineID> pipelines = scm.getScmNodeManager()
.getPipelines(dns.get(0)); .getPipelines(dns.get(0));
Assert.assertEquals(1, pipelines.size()); Assert.assertTrue(pipelines.contains(ratisContainer.getPipeline().getId()));
pipelines.forEach(p -> Assert.assertEquals(p,
ratisContainer.getPipeline().getId()));
// Now close the container and it should not show up while fetching // Now close the container and it should not show up while fetching
// containers by pipeline // containers by pipeline
@ -118,6 +116,7 @@ public void testPipelineMap() throws IOException {
pipelineManager.removePipeline(ratisContainer.getPipeline().getId()); pipelineManager.removePipeline(ratisContainer.getPipeline().getId());
pipelines = scm.getScmNodeManager() pipelines = scm.getScmNodeManager()
.getPipelines(dns.get(0)); .getPipelines(dns.get(0));
Assert.assertEquals(0, pipelines.size()); Assert
.assertFalse(pipelines.contains(ratisContainer.getPipeline().getId()));
} }
} }

View File

@ -127,8 +127,8 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
pipelineManager.removePipeline(pipeline1.getId()); pipelineManager.removePipeline(pipeline1.getId());
for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) { for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well // Assert that the pipeline has been removed from Node2PipelineMap as well
Assert.assertEquals(scm.getScmNodeManager().getPipelines( Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
dn).size(), 0); .contains(ratisContainer.getPipeline().getId()));
} }
} }

View File

@ -84,7 +84,7 @@ private List<DatanodeDetails> createListOfNodes(int nodeCount) {
} }
@Test @Test
public void testCreatePipelineWithNodes() throws IOException { public void testCreatePipelineWithNodes() {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = Pipeline pipeline =
provider.create(factor, createListOfNodes(factor.getNumber())); provider.create(factor, createListOfNodes(factor.getNumber()));

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests for RatisPipelineUtils.
*/
public class TestRatisPipelineUtils {
private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private static PipelineManager pipelineManager;
public void init(int numDatanodes) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
StorageContainerManager scm = cluster.getStorageContainerManager();
pipelineManager = scm.getPipelineManager();
}
@Test(timeout = 30000)
public void testAutomaticPipelineCreationOnPipelineDestroy()
throws Exception {
init(6);
// make sure two pipelines are created
waitForPipelines(2);
List<Pipeline> pipelines = pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
for (Pipeline pipeline : pipelines) {
RatisPipelineUtils
.finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
}
// make sure two pipelines are created
waitForPipelines(2);
}
@Test(timeout = 30000)
public void testPipelineCreationOnNodeRestart() throws Exception {
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
5, TimeUnit.SECONDS);
init(3);
// make sure a pipelines is created
waitForPipelines(1);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
cluster.shutdownHddsDatanode(dn.getDatanodeDetails());
}
// make sure pipelines is destroyed
waitForPipelines(0);
cluster.startHddsDatanodes();
// make sure pipelines is created after node start
waitForPipelines(1);
}
private void waitForPipelines(int numPipelines)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.size() == numPipelines, 100, 10000);
}
}

View File

@ -58,7 +58,7 @@ public class TestSCMRestart {
public static void init() throws Exception { public static void init() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf) cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(6) .setNumDatanodes(7)
.setHbInterval(1000) .setHbInterval(1000)
.setHbProcessorInterval(1000) .setHbProcessorInterval(1000)
.build(); .build();

View File

@ -204,7 +204,7 @@ public void testBlockDeletionTransactions() throws Exception {
1, TimeUnit.SECONDS); 1, TimeUnit.SECONDS);
// Reset container provision size, otherwise only one container // Reset container provision size, otherwise only one container
// is created by default. // is created by default.
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
numKeys); numKeys);
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
@ -286,7 +286,7 @@ public void testBlockDeletingThrottling() throws Exception {
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
100, TimeUnit.MILLISECONDS); 100, TimeUnit.MILLISECONDS);
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
numKeys); numKeys);
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)

View File

@ -44,7 +44,7 @@ public class TestOzoneRpcClient extends TestOzoneRpcClientAbstract {
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
startCluster(conf); startCluster(conf);
} }

View File

@ -42,7 +42,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
startCluster(conf); startCluster(conf);
} }

View File

@ -86,7 +86,7 @@ public static void init() throws Exception {
OzoneManager.setTestSecureOmFlag(true); OzoneManager.setTestSecureOmFlag(true);
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true); conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true);
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
CertificateClientTestImpl certificateClientTest = CertificateClientTestImpl certificateClientTest =

View File

@ -68,9 +68,10 @@ public class TestCloseContainerByPipeline {
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, "1"); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
cluster = MiniOzoneCluster.newBuilder(conf) cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(9).build(); .setNumDatanodes(10)
.build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key //the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf); client = OzoneClientFactory.getClient(conf);

View File

@ -72,7 +72,7 @@ public static void init() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 1, conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 1,
StorageUnit.MB); StorageUnit.MB);
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 5);
cluster = MiniOzoneCluster.newBuilder(conf).build(); cluster = MiniOzoneCluster.newBuilder(conf).build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); storageHandler = new ObjectStoreHandler(conf).getStorageHandler();

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -128,9 +127,7 @@ public void testChillModeOperations() throws Exception {
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100, 4096); Map<String, OmKeyInfo> keyLocations = helper.createKeys(100, 4096);
final List<ContainerInfo> containers = cluster final List<ContainerInfo> containers = cluster
.getStorageContainerManager().getContainerManager().getContainers(); .getStorageContainerManager().getContainerManager().getContainers();
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> containers.size() >= 3, 100, 1000);
return containers.size() > 10;
}, 100, 1000);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
@ -251,15 +248,11 @@ public void testSCMChillMode() throws Exception {
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100 * 2, 4096); Map<String, OmKeyInfo> keyLocations = helper.createKeys(100 * 2, 4096);
final List<ContainerInfo> containers = miniCluster final List<ContainerInfo> containers = miniCluster
.getStorageContainerManager().getContainerManager().getContainers(); .getStorageContainerManager().getContainerManager().getContainers();
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> containers.size() >= 3, 100, 1000 * 2);
return containers.size() > 10;
}, 100, 1000 * 2);
// Removing some container to keep them open. // Removing some container to keep them open.
containers.remove(0); containers.remove(0);
containers.remove(1); containers.remove(0);
containers.remove(2);
containers.remove(3);
// Close remaining containers // Close remaining containers
SCMContainerManager mapping = (SCMContainerManager) miniCluster SCMContainerManager mapping = (SCMContainerManager) miniCluster
@ -300,16 +293,11 @@ public void testSCMChillMode() throws Exception {
assertTrue(scm.isInChillMode()); assertTrue(scm.isInChillMode());
assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode.")); assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
assertTrue(scm.getCurrentContainerThreshold() == 0); assertTrue(scm.getCurrentContainerThreshold() == 0);
AtomicDouble curThreshold = new AtomicDouble();
AtomicDouble lastReportedThreshold = new AtomicDouble();
for (HddsDatanodeService dn : miniCluster.getHddsDatanodes()) { for (HddsDatanodeService dn : miniCluster.getHddsDatanodes()) {
dn.start(null); dn.start(null);
GenericTestUtils.waitFor(() -> {
curThreshold.set(scm.getCurrentContainerThreshold());
return curThreshold.get() > lastReportedThreshold.get();
}, 100, 1000 * 5);
lastReportedThreshold.set(curThreshold.get());
} }
GenericTestUtils
.waitFor(() -> scm.getCurrentContainerThreshold() == 1.0, 100, 20000);
cluster = miniCluster; cluster = miniCluster;
double chillModeCutoff = conf double chillModeCutoff = conf
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,

View File

@ -104,7 +104,7 @@ public void setup() throws Exception {
blockContainerMap = new HashMap<>(); blockContainerMap = new HashMap<>();
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 2); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 2);
conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
@ -136,7 +136,7 @@ public void setup() throws Exception {
}, 10, 1000 * 15); }, 10, 1000 * 15);
// blockManager.allocateBlock() will create containers if there is none // blockManager.allocateBlock() will create containers if there is none
// stored in levelDB. The number of containers to create is the value of // stored in levelDB. The number of containers to create is the value of
// OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE which we set to 2. // OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT which we set to 2.
// so the first allocateBlock() will create two containers. A random one // so the first allocateBlock() will create two containers. A random one
// is assigned for the block. // is assigned for the block.