HDDS-429. StorageContainerManager lock optimization.

Contributed by Nanda Kumar.
This commit is contained in:
Anu Engineer 2018-09-14 10:08:06 -07:00
parent 144a55f0e3
commit 0c8a43b9ec
2 changed files with 228 additions and 182 deletions

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.hadoop.hdds.scm.block; package org.apache.hadoop.hdds.scm.block;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@ -45,8 +44,8 @@
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CHILL_MODE_EXCEPTION; .CHILL_MODE_EXCEPTION;
@ -72,7 +71,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final NodeManager nodeManager; private final NodeManager nodeManager;
private final Mapping containerManager; private final Mapping containerManager;
private final Lock lock; private final ReadWriteLock lock;
private final long containerSize; private final long containerSize;
private final DeletedBlockLog deletedBlockLog; private final DeletedBlockLog deletedBlockLog;
@ -108,7 +107,7 @@ public BlockManagerImpl(final Configuration conf,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT); ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
rand = new Random(); rand = new Random();
this.lock = new ReentrantLock(); this.lock = new ReentrantReadWriteLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
@ -155,29 +154,22 @@ public void stop() throws IOException {
* @param factor - how many copies needed for this container. * @param factor - how many copies needed for this container.
* @throws IOException * @throws IOException
*/ */
private void preAllocateContainers(int count, ReplicationType type, private synchronized void preAllocateContainers(int count,
ReplicationFactor factor, String owner) ReplicationType type, ReplicationFactor factor, String owner)
throws IOException { throws IOException {
lock.lock(); for (int i = 0; i < count; i++) {
try { ContainerWithPipeline containerWithPipeline;
for (int i = 0; i < count; i++) { try {
ContainerWithPipeline containerWithPipeline; // TODO: Fix this later when Ratis is made the Default.
try { containerWithPipeline = containerManager.allocateContainer(
// TODO: Fix this later when Ratis is made the Default. type, factor, owner);
containerWithPipeline = containerManager.allocateContainer(
type, factor, owner);
if (containerWithPipeline == null) { if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container."); LOG.warn("Unable to allocate container.");
continue;
}
} catch (IOException ex) {
LOG.warn("Unable to allocate container: {}", ex);
continue;
} }
} catch (IOException ex) {
LOG.warn("Unable to allocate container: {}", ex);
} }
} finally {
lock.unlock();
} }
} }
@ -208,46 +200,61 @@ public AllocatedBlock allocateBlock(final long size,
CHILL_MODE_EXCEPTION); CHILL_MODE_EXCEPTION);
} }
lock.lock(); /*
Here is the high level logic.
1. First we check if there are containers in ALLOCATED state, that is
SCM has allocated them in the SCM namespace but the corresponding
container has not been created in the Datanode yet. If we have any in
that state, we will return that to the client, which allows client to
finish creating those containers. This is a sort of greedy algorithm,
our primary purpose is to get as many containers as possible.
2. If there are no allocated containers -- Then we find a Open container
that matches that pattern.
3. If both of them fail, the we will pre-allocate a bunch of containers
in SCM and try again.
TODO : Support random picking of two containers from the list. So we can
use different kind of policies.
*/
ContainerWithPipeline containerWithPipeline;
lock.readLock().lock();
try { try {
/* // This is to optimize performance, if the below condition is evaluated
Here is the high level logic. // to false, then we can be sure that there are no containers in
// ALLOCATED state.
1. First we check if there are containers in ALLOCATED state, // This can result in false positive, but it will never be false negative.
that is // How can this result in false positive? We check if there are any
SCM has allocated them in the SCM namespace but the // containers in ALLOCATED state, this check doesn't care about the
corresponding // USER of the containers. So there might be cases where a different
container has not been created in the Datanode yet. If we // USER has few containers in ALLOCATED state, which will result in
have any // false positive.
in that state, we will return that to the client, which allows if (!containerManager.getStateManager().getContainerStateMap()
client to finish creating those containers. This is a sort of .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
greedy .isEmpty()) {
algorithm, our primary purpose is to get as many containers as // Since the above check can result in false positive, we have to do
possible. // the actual check and find out if there are containers in ALLOCATED
// state matching our criteria.
2. If there are no allocated containers -- Then we find a Open synchronized (this) {
container that matches that pattern. // Using containers from ALLOCATED state should be done within
// synchronized block (or) write lock. Since we already hold a
3. If both of them fail, the we will pre-allocate a bunch of // read lock, we will end up in deadlock situation if we take
conatainers in SCM and try again. // write lock here.
containerWithPipeline = containerManager
TODO : Support random picking of two containers from the list. .getMatchingContainerWithPipeline(size, owner, type, factor,
So we HddsProtos.LifeCycleState.ALLOCATED);
can use different kind of policies. if (containerWithPipeline != null) {
*/ containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
ContainerWithPipeline containerWithPipeline; HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
// Look for ALLOCATED container that matches all other parameters. HddsProtos.LifeCycleState.ALLOCATED);
containerWithPipeline = containerManager }
.getMatchingContainerWithPipeline(size, owner, type, factor, }
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
} }
// Since we found no allocated containers that match our criteria, let us // Since we found no allocated containers that match our criteria, let us
@ -263,20 +270,34 @@ public AllocatedBlock allocateBlock(final long size,
// that most of our containers are full or we have not allocated // that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and // containers of the type and replication factor. So let us go and
// allocate some. // allocate some.
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
// Since we just allocated a set of containers this should work // Even though we have already checked the containers in ALLOCATED
containerWithPipeline = containerManager // state, we have to check again as we only hold a read lock.
.getMatchingContainerWithPipeline(size, owner, type, factor, // Some other thread might have pre-allocated container in meantime.
synchronized (this) {
if (!containerManager.getStateManager().getContainerStateMap()
.getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
.isEmpty()) {
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}
if (containerWithPipeline == null) {
preAllocateContainers(containerProvisionBatchSize,
type, factor, owner);
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED); HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) { }
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
} }
// we have tried all strategies we know and but somehow we are not able // we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null. // to get a container for this block. Log that info and return a null.
LOG.error( LOG.error(
@ -286,19 +307,9 @@ public AllocatedBlock allocateBlock(final long size,
type, type,
factor); factor);
return null; return null;
} finally {
lock.unlock();
}
}
private String getChannelName(ReplicationType type) { } finally {
switch (type) { lock.readLock().unlock();
case RATIS:
return "RA" + UUID.randomUUID().toString().substring(3);
case STAND_ALONE:
return "SA" + UUID.randomUUID().toString().substring(3);
default:
return "RA" + UUID.randomUUID().toString().substring(3);
} }
} }
@ -353,40 +364,34 @@ public void deleteBlocks(List<BlockID> blockIDs) throws IOException {
CHILL_MODE_EXCEPTION); CHILL_MODE_EXCEPTION);
} }
lock.lock();
LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs)); LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs));
Map<Long, List<Long>> containerBlocks = new HashMap<>(); Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container // TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted. // TODO: used space when the block is deleted.
try { for (BlockID block : blockIDs) {
for (BlockID block : blockIDs) { // Merge blocks to a container to blocks mapping,
// Merge blocks to a container to blocks mapping, // prepare to persist this info to the deletedBlocksLog.
// prepare to persist this info to the deletedBlocksLog. long containerID = block.getContainerID();
long containerID = block.getContainerID(); if (containerBlocks.containsKey(containerID)) {
if (containerBlocks.containsKey(containerID)) { containerBlocks.get(containerID).add(block.getLocalID());
containerBlocks.get(containerID).add(block.getLocalID()); } else {
} else { List<Long> item = new ArrayList<>();
List<Long> item = new ArrayList<>(); item.add(block.getLocalID());
item.add(block.getLocalID()); containerBlocks.put(containerID, item);
containerBlocks.put(containerID, item);
}
} }
try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. Batch skipped: "
+ StringUtils.join(",", blockIDs),
e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
} finally {
lock.unlock();
} }
try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. Batch skipped: "
+ StringUtils.join(",", blockIDs), e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
} }
@Override @Override

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.util.AutoCloseableLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,6 +39,8 @@
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CONTAINER_EXISTS; .CONTAINER_EXISTS;
@ -108,7 +109,7 @@ public class ContainerStateMap {
// Container State Map lock should be held before calling into // Container State Map lock should be held before calling into
// Update ContainerAttributes. The consistency of ContainerAttributes is // Update ContainerAttributes. The consistency of ContainerAttributes is
// protected by this lock. // protected by this lock.
private final AutoCloseableLock autoLock; private final ReadWriteLock lock;
/** /**
* Create a ContainerStateMap. * Create a ContainerStateMap.
@ -120,7 +121,7 @@ public ContainerStateMap() {
typeMap = new ContainerAttribute<>(); typeMap = new ContainerAttribute<>();
openPipelineMap = new ContainerAttribute<>(); openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>(); containerMap = new HashMap<>();
autoLock = new AutoCloseableLock(); lock = new ReentrantReadWriteLock();
contReplicaMap = new HashMap<>(); contReplicaMap = new HashMap<>();
// new InstrumentedLock(getClass().getName(), LOG, // new InstrumentedLock(getClass().getName(), LOG,
// new ReentrantLock(), // new ReentrantLock(),
@ -140,7 +141,8 @@ public void addContainer(ContainerInfo info)
Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0, Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
"ExpectedReplicaCount should be greater than 0"); "ExpectedReplicaCount should be greater than 0");
try (AutoCloseableLock lock = autoLock.acquire()) { lock.writeLock().lock();
try {
ContainerID id = ContainerID.valueof(info.getContainerID()); ContainerID id = ContainerID.valueof(info.getContainerID());
if (containerMap.putIfAbsent(id, info) != null) { if (containerMap.putIfAbsent(id, info) != null) {
LOG.debug("Duplicate container ID detected. {}", id); LOG.debug("Duplicate container ID detected. {}", id);
@ -157,6 +159,8 @@ public void addContainer(ContainerInfo info)
openPipelineMap.insert(info.getPipelineID(), id); openPipelineMap.insert(info.getPipelineID(), id);
} }
LOG.trace("Created container with {} successfully.", id); LOG.trace("Created container with {} successfully.", id);
} finally {
lock.writeLock().unlock();
} }
} }
@ -177,8 +181,13 @@ public ContainerInfo getContainerInfo(ContainerInfo info) {
* @return container info, if found. * @return container info, if found.
*/ */
public ContainerInfo getContainerInfo(long containerID) { public ContainerInfo getContainerInfo(long containerID) {
ContainerID id = new ContainerID(containerID); lock.readLock().lock();
return containerMap.get(id); try {
ContainerID id = new ContainerID(containerID);
return containerMap.get(id);
} finally {
lock.readLock().unlock();
}
} }
/** /**
@ -191,11 +200,14 @@ public ContainerInfo getContainerInfo(long containerID) {
public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID) public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
throws SCMException { throws SCMException {
Preconditions.checkNotNull(containerID); Preconditions.checkNotNull(containerID);
try (AutoCloseableLock lock = autoLock.acquire()) { lock.readLock().lock();
try {
if (contReplicaMap.containsKey(containerID)) { if (contReplicaMap.containsKey(containerID)) {
return Collections return Collections
.unmodifiableSet(contReplicaMap.get(containerID)); .unmodifiableSet(contReplicaMap.get(containerID));
} }
} finally {
lock.readLock().unlock();
} }
throw new SCMException( throw new SCMException(
"No entry exist for containerId: " + containerID + " in replica map.", "No entry exist for containerId: " + containerID + " in replica map.",
@ -213,8 +225,8 @@ public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
public void addContainerReplica(ContainerID containerID, public void addContainerReplica(ContainerID containerID,
DatanodeDetails... dnList) { DatanodeDetails... dnList) {
Preconditions.checkNotNull(containerID); Preconditions.checkNotNull(containerID);
// Take lock to avoid race condition around insertion. lock.writeLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
for (DatanodeDetails dn : dnList) { for (DatanodeDetails dn : dnList) {
Preconditions.checkNotNull(dn); Preconditions.checkNotNull(dn);
if (contReplicaMap.containsKey(containerID)) { if (contReplicaMap.containsKey(containerID)) {
@ -228,6 +240,8 @@ public void addContainerReplica(ContainerID containerID,
contReplicaMap.put(containerID, dnSet); contReplicaMap.put(containerID, dnSet);
} }
} }
} finally {
lock.writeLock().unlock();
} }
} }
@ -243,11 +257,13 @@ public boolean removeContainerReplica(ContainerID containerID,
Preconditions.checkNotNull(containerID); Preconditions.checkNotNull(containerID);
Preconditions.checkNotNull(dn); Preconditions.checkNotNull(dn);
// Take lock to avoid race condition. lock.writeLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
if (contReplicaMap.containsKey(containerID)) { if (contReplicaMap.containsKey(containerID)) {
return contReplicaMap.get(containerID).remove(dn); return contReplicaMap.get(containerID).remove(dn);
} }
} finally {
lock.writeLock().unlock();
} }
throw new SCMException( throw new SCMException(
"No entry exist for containerId: " + containerID + " in replica map.", "No entry exist for containerId: " + containerID + " in replica map.",
@ -265,8 +281,11 @@ public static Logger getLOG() {
* @return - Map * @return - Map
*/ */
public Map<ContainerID, ContainerInfo> getContainerMap() { public Map<ContainerID, ContainerInfo> getContainerMap() {
try (AutoCloseableLock lock = autoLock.acquire()) { lock.readLock().lock();
try {
return Collections.unmodifiableMap(containerMap); return Collections.unmodifiableMap(containerMap);
} finally {
lock.readLock().unlock();
} }
} }
@ -277,7 +296,8 @@ public Map<ContainerID, ContainerInfo> getContainerMap() {
public void updateContainerInfo(ContainerInfo info) throws SCMException { public void updateContainerInfo(ContainerInfo info) throws SCMException {
Preconditions.checkNotNull(info); Preconditions.checkNotNull(info);
ContainerInfo currentInfo = null; ContainerInfo currentInfo = null;
try (AutoCloseableLock lock = autoLock.acquire()) { lock.writeLock().lock();
try {
currentInfo = containerMap.get( currentInfo = containerMap.get(
ContainerID.valueof(info.getContainerID())); ContainerID.valueof(info.getContainerID()));
@ -285,6 +305,8 @@ public void updateContainerInfo(ContainerInfo info) throws SCMException {
throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER); throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
} }
containerMap.put(info.containerID(), info); containerMap.put(info.containerID(), info);
} finally {
lock.writeLock().unlock();
} }
} }
@ -304,51 +326,56 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
ContainerID id = new ContainerID(info.getContainerID()); ContainerID id = new ContainerID(info.getContainerID());
ContainerInfo currentInfo = null; ContainerInfo currentInfo = null;
try (AutoCloseableLock lock = autoLock.acquire()) { lock.writeLock().lock();
currentInfo = containerMap.get(id); try {
try {
currentInfo = containerMap.get(id);
if (currentInfo == null) { if (currentInfo == null) {
throw new throw new
SCMException("No such container.", FAILED_TO_FIND_CONTAINER); SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
}
// We are updating two places before this update is done, these can
// fail independently, since the code needs to handle it.
// We update the attribute map, if that fails it will throw an
// exception, so no issues, if we are successful, we keep track of the
// fact that we have updated the lifecycle state in the map, and update
// the container state. If this second update fails, we will attempt to
// roll back the earlier change we did. If the rollback fails, we can
// be in an inconsistent state,
info.setState(newState);
containerMap.put(id, info);
lifeCycleStateMap.update(currentState, newState, id);
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", id, currentState, newState);
} catch (SCMException ex) {
LOG.error("Unable to update the container state. {}", ex);
// we need to revert the change in this attribute since we are not
// able to update the hash table.
LOG.info("Reverting the update to lifecycle state. Moving back to " +
"old state. Old = {}, Attempted state = {}", currentState,
newState);
containerMap.put(id, currentInfo);
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
// container state will not in sync since we were not able to put
// that into the hash table.
lifeCycleStateMap.update(newState, currentState, id);
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
} }
// We are updating two places before this update is done, these can // In case the container is set to closed state, it needs to be removed
// fail independently, since the code needs to handle it. // from the pipeline Map.
if (!info.isContainerOpen()) {
// We update the attribute map, if that fails it will throw an exception, openPipelineMap.remove(info.getPipelineID(), id);
// so no issues, if we are successful, we keep track of the fact that we }
// have updated the lifecycle state in the map, and update the container } finally {
// state. If this second update fails, we will attempt to roll back the lock.writeLock().unlock();
// earlier change we did. If the rollback fails, we can be in an
// inconsistent state,
info.setState(newState);
containerMap.put(id, info);
lifeCycleStateMap.update(currentState, newState, id);
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", id, currentState, newState);
} catch (SCMException ex) {
LOG.error("Unable to update the container state. {}", ex);
// we need to revert the change in this attribute since we are not
// able to update the hash table.
LOG.info("Reverting the update to lifecycle state. Moving back to " +
"old state. Old = {}, Attempted state = {}", currentState,
newState);
containerMap.put(id, currentInfo);
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
// container state will not in sync since we were not able to put
// that into the hash table.
lifeCycleStateMap.update(newState, currentState, id);
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (!info.isContainerOpen()) {
openPipelineMap.remove(info.getPipelineID(), id);
} }
} }
@ -360,9 +387,11 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
*/ */
NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) { NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
Preconditions.checkNotNull(ownerName); Preconditions.checkNotNull(ownerName);
lock.readLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
return ownerMap.getCollection(ownerName); return ownerMap.getCollection(ownerName);
} finally {
lock.readLock().unlock();
} }
} }
@ -374,9 +403,11 @@ NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
*/ */
NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) { NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
Preconditions.checkNotNull(type); Preconditions.checkNotNull(type);
lock.readLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
return typeMap.getCollection(type); return typeMap.getCollection(type);
} finally {
lock.readLock().unlock();
} }
} }
@ -389,9 +420,11 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline( public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
PipelineID pipelineID) { PipelineID pipelineID) {
Preconditions.checkNotNull(pipelineID); Preconditions.checkNotNull(pipelineID);
lock.readLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
return openPipelineMap.getCollection(pipelineID); return openPipelineMap.getCollection(pipelineID);
} finally {
lock.readLock().unlock();
} }
} }
@ -403,9 +436,11 @@ public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
*/ */
NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) { NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
Preconditions.checkNotNull(factor); Preconditions.checkNotNull(factor);
lock.readLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
return factorMap.getCollection(factor); return factorMap.getCollection(factor);
} finally {
lock.readLock().unlock();
} }
} }
@ -415,11 +450,14 @@ NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
* @param state - State - Open, Closed etc. * @param state - State - Open, Closed etc.
* @return List of containers by state. * @return List of containers by state.
*/ */
NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) { public NavigableSet<ContainerID> getContainerIDsByState(
LifeCycleState state) {
Preconditions.checkNotNull(state); Preconditions.checkNotNull(state);
lock.readLock().lock();
try (AutoCloseableLock lock = autoLock.acquire()) { try {
return lifeCycleStateMap.getCollection(state); return lifeCycleStateMap.getCollection(state);
} finally {
lock.readLock().unlock();
} }
} }
@ -441,7 +479,8 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
Preconditions.checkNotNull(factor, "Factor cannot be null"); Preconditions.checkNotNull(factor, "Factor cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null");
try (AutoCloseableLock lock = autoLock.acquire()) { lock.readLock().lock();
try {
// If we cannot meet any one condition we return EMPTY_SET immediately. // If we cannot meet any one condition we return EMPTY_SET immediately.
// Since when we intersect these sets, the result will be empty if any // Since when we intersect these sets, the result will be empty if any
@ -479,6 +518,8 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
currentSet = intersectSets(currentSet, sets[x]); currentSet = intersectSets(currentSet, sets[x]);
} }
return currentSet; return currentSet;
} finally {
lock.readLock().unlock();
} }
} }