diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 82d9a28bb03..e4e33c7227e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.hdds.scm.block; -import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -45,8 +44,8 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .CHILL_MODE_EXCEPTION; @@ -72,7 +71,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private final NodeManager nodeManager; private final Mapping containerManager; - private final Lock lock; + private final ReadWriteLock lock; private final long containerSize; private final DeletedBlockLog deletedBlockLog; @@ -108,7 +107,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT); rand = new Random(); - this.lock = new ReentrantLock(); + this.lock = new ReentrantReadWriteLock(); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); @@ -155,29 +154,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { * @param factor - how many copies needed for this container. * @throws IOException */ - private void preAllocateContainers(int count, ReplicationType type, - ReplicationFactor factor, String owner) + private synchronized void preAllocateContainers(int count, + ReplicationType type, ReplicationFactor factor, String owner) throws IOException { - lock.lock(); - try { - for (int i = 0; i < count; i++) { - ContainerWithPipeline containerWithPipeline; - try { - // TODO: Fix this later when Ratis is made the Default. - containerWithPipeline = containerManager.allocateContainer( - type, factor, owner); + for (int i = 0; i < count; i++) { + ContainerWithPipeline containerWithPipeline; + try { + // TODO: Fix this later when Ratis is made the Default. + containerWithPipeline = containerManager.allocateContainer( + type, factor, owner); - if (containerWithPipeline == null) { - LOG.warn("Unable to allocate container."); - continue; - } - } catch (IOException ex) { - LOG.warn("Unable to allocate container: {}", ex); - continue; + if (containerWithPipeline == null) { + LOG.warn("Unable to allocate container."); } + } catch (IOException ex) { + LOG.warn("Unable to allocate container: {}", ex); } - } finally { - lock.unlock(); } } @@ -208,46 +200,61 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { CHILL_MODE_EXCEPTION); } - lock.lock(); + /* + Here is the high level logic. + + 1. First we check if there are containers in ALLOCATED state, that is + SCM has allocated them in the SCM namespace but the corresponding + container has not been created in the Datanode yet. If we have any in + that state, we will return that to the client, which allows client to + finish creating those containers. This is a sort of greedy algorithm, + our primary purpose is to get as many containers as possible. + + 2. If there are no allocated containers -- Then we find a Open container + that matches that pattern. + + 3. If both of them fail, the we will pre-allocate a bunch of containers + in SCM and try again. + + TODO : Support random picking of two containers from the list. So we can + use different kind of policies. + */ + + ContainerWithPipeline containerWithPipeline; + + lock.readLock().lock(); try { - /* - Here is the high level logic. - - 1. First we check if there are containers in ALLOCATED state, - that is - SCM has allocated them in the SCM namespace but the - corresponding - container has not been created in the Datanode yet. If we - have any - in that state, we will return that to the client, which allows - client to finish creating those containers. This is a sort of - greedy - algorithm, our primary purpose is to get as many containers as - possible. - - 2. If there are no allocated containers -- Then we find a Open - container that matches that pattern. - - 3. If both of them fail, the we will pre-allocate a bunch of - conatainers in SCM and try again. - - TODO : Support random picking of two containers from the list. - So we - can use different kind of policies. - */ - - ContainerWithPipeline containerWithPipeline; - - // Look for ALLOCATED container that matches all other parameters. - containerWithPipeline = containerManager - .getMatchingContainerWithPipeline(size, owner, type, factor, - HddsProtos.LifeCycleState.ALLOCATED); - if (containerWithPipeline != null) { - containerManager.updateContainerState( - containerWithPipeline.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - return newBlock(containerWithPipeline, - HddsProtos.LifeCycleState.ALLOCATED); + // This is to optimize performance, if the below condition is evaluated + // to false, then we can be sure that there are no containers in + // ALLOCATED state. + // This can result in false positive, but it will never be false negative. + // How can this result in false positive? We check if there are any + // containers in ALLOCATED state, this check doesn't care about the + // USER of the containers. So there might be cases where a different + // USER has few containers in ALLOCATED state, which will result in + // false positive. + if (!containerManager.getStateManager().getContainerStateMap() + .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED) + .isEmpty()) { + // Since the above check can result in false positive, we have to do + // the actual check and find out if there are containers in ALLOCATED + // state matching our criteria. + synchronized (this) { + // Using containers from ALLOCATED state should be done within + // synchronized block (or) write lock. Since we already hold a + // read lock, we will end up in deadlock situation if we take + // write lock here. + containerWithPipeline = containerManager + .getMatchingContainerWithPipeline(size, owner, type, factor, + HddsProtos.LifeCycleState.ALLOCATED); + if (containerWithPipeline != null) { + containerManager.updateContainerState( + containerWithPipeline.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + return newBlock(containerWithPipeline, + HddsProtos.LifeCycleState.ALLOCATED); + } + } } // Since we found no allocated containers that match our criteria, let us @@ -263,20 +270,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // that most of our containers are full or we have not allocated // containers of the type and replication factor. So let us go and // allocate some. - preAllocateContainers(containerProvisionBatchSize, type, factor, owner); - // Since we just allocated a set of containers this should work - containerWithPipeline = containerManager - .getMatchingContainerWithPipeline(size, owner, type, factor, + // Even though we have already checked the containers in ALLOCATED + // state, we have to check again as we only hold a read lock. + // Some other thread might have pre-allocated container in meantime. + synchronized (this) { + if (!containerManager.getStateManager().getContainerStateMap() + .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED) + .isEmpty()) { + containerWithPipeline = containerManager + .getMatchingContainerWithPipeline(size, owner, type, factor, + HddsProtos.LifeCycleState.ALLOCATED); + } + if (containerWithPipeline == null) { + preAllocateContainers(containerProvisionBatchSize, + type, factor, owner); + containerWithPipeline = containerManager + .getMatchingContainerWithPipeline(size, owner, type, factor, + HddsProtos.LifeCycleState.ALLOCATED); + } + + if (containerWithPipeline != null) { + containerManager.updateContainerState( + containerWithPipeline.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.ALLOCATED); - if (containerWithPipeline != null) { - containerManager.updateContainerState( - containerWithPipeline.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - return newBlock(containerWithPipeline, - HddsProtos.LifeCycleState.ALLOCATED); + } } - // we have tried all strategies we know and but somehow we are not able // to get a container for this block. Log that info and return a null. LOG.error( @@ -286,19 +307,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { type, factor); return null; - } finally { - lock.unlock(); - } - } - private String getChannelName(ReplicationType type) { - switch (type) { - case RATIS: - return "RA" + UUID.randomUUID().toString().substring(3); - case STAND_ALONE: - return "SA" + UUID.randomUUID().toString().substring(3); - default: - return "RA" + UUID.randomUUID().toString().substring(3); + } finally { + lock.readLock().unlock(); } } @@ -353,40 +364,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { CHILL_MODE_EXCEPTION); } - lock.lock(); LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs)); Map> containerBlocks = new HashMap<>(); // TODO: track the block size info so that we can reclaim the container // TODO: used space when the block is deleted. - try { - for (BlockID block : blockIDs) { - // Merge blocks to a container to blocks mapping, - // prepare to persist this info to the deletedBlocksLog. - long containerID = block.getContainerID(); - if (containerBlocks.containsKey(containerID)) { - containerBlocks.get(containerID).add(block.getLocalID()); - } else { - List item = new ArrayList<>(); - item.add(block.getLocalID()); - containerBlocks.put(containerID, item); - } + for (BlockID block : blockIDs) { + // Merge blocks to a container to blocks mapping, + // prepare to persist this info to the deletedBlocksLog. + long containerID = block.getContainerID(); + if (containerBlocks.containsKey(containerID)) { + containerBlocks.get(containerID).add(block.getLocalID()); + } else { + List item = new ArrayList<>(); + item.add(block.getLocalID()); + containerBlocks.put(containerID, item); } - - try { - deletedBlockLog.addTransactions(containerBlocks); - } catch (IOException e) { - throw new IOException( - "Skip writing the deleted blocks info to" - + " the delLog because addTransaction fails. Batch skipped: " - + StringUtils.join(",", blockIDs), - e); - } - // TODO: Container report handling of the deleted blocks: - // Remove tombstone and update open container usage. - // We will revisit this when the closed container replication is done. - } finally { - lock.unlock(); } + + try { + deletedBlockLog.addTransactions(containerBlocks); + } catch (IOException e) { + throw new IOException( + "Skip writing the deleted blocks info to" + + " the delLog because addTransaction fails. Batch skipped: " + + StringUtils.join(",", blockIDs), e); + } + // TODO: Container report handling of the deleted blocks: + // Remove tombstone and update open container usage. + // We will revisit this when the closed container replication is done. } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 4d34cb76bb6..8cb65cfa94c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; -import org.apache.hadoop.util.AutoCloseableLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +39,8 @@ import java.util.HashMap; import java.util.Map; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .CONTAINER_EXISTS; @@ -108,7 +109,7 @@ public class ContainerStateMap { // Container State Map lock should be held before calling into // Update ContainerAttributes. The consistency of ContainerAttributes is // protected by this lock. - private final AutoCloseableLock autoLock; + private final ReadWriteLock lock; /** * Create a ContainerStateMap. @@ -120,7 +121,7 @@ public class ContainerStateMap { typeMap = new ContainerAttribute<>(); openPipelineMap = new ContainerAttribute<>(); containerMap = new HashMap<>(); - autoLock = new AutoCloseableLock(); + lock = new ReentrantReadWriteLock(); contReplicaMap = new HashMap<>(); // new InstrumentedLock(getClass().getName(), LOG, // new ReentrantLock(), @@ -140,7 +141,8 @@ public class ContainerStateMap { Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0, "ExpectedReplicaCount should be greater than 0"); - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.writeLock().lock(); + try { ContainerID id = ContainerID.valueof(info.getContainerID()); if (containerMap.putIfAbsent(id, info) != null) { LOG.debug("Duplicate container ID detected. {}", id); @@ -157,6 +159,8 @@ public class ContainerStateMap { openPipelineMap.insert(info.getPipelineID(), id); } LOG.trace("Created container with {} successfully.", id); + } finally { + lock.writeLock().unlock(); } } @@ -177,8 +181,13 @@ public class ContainerStateMap { * @return container info, if found. */ public ContainerInfo getContainerInfo(long containerID) { - ContainerID id = new ContainerID(containerID); - return containerMap.get(id); + lock.readLock().lock(); + try { + ContainerID id = new ContainerID(containerID); + return containerMap.get(id); + } finally { + lock.readLock().unlock(); + } } /** @@ -191,11 +200,14 @@ public class ContainerStateMap { public Set getContainerReplicas(ContainerID containerID) throws SCMException { Preconditions.checkNotNull(containerID); - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { if (contReplicaMap.containsKey(containerID)) { return Collections .unmodifiableSet(contReplicaMap.get(containerID)); } + } finally { + lock.readLock().unlock(); } throw new SCMException( "No entry exist for containerId: " + containerID + " in replica map.", @@ -213,8 +225,8 @@ public class ContainerStateMap { public void addContainerReplica(ContainerID containerID, DatanodeDetails... dnList) { Preconditions.checkNotNull(containerID); - // Take lock to avoid race condition around insertion. - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.writeLock().lock(); + try { for (DatanodeDetails dn : dnList) { Preconditions.checkNotNull(dn); if (contReplicaMap.containsKey(containerID)) { @@ -228,6 +240,8 @@ public class ContainerStateMap { contReplicaMap.put(containerID, dnSet); } } + } finally { + lock.writeLock().unlock(); } } @@ -243,11 +257,13 @@ public class ContainerStateMap { Preconditions.checkNotNull(containerID); Preconditions.checkNotNull(dn); - // Take lock to avoid race condition. - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.writeLock().lock(); + try { if (contReplicaMap.containsKey(containerID)) { return contReplicaMap.get(containerID).remove(dn); } + } finally { + lock.writeLock().unlock(); } throw new SCMException( "No entry exist for containerId: " + containerID + " in replica map.", @@ -265,8 +281,11 @@ public class ContainerStateMap { * @return - Map */ public Map getContainerMap() { - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return Collections.unmodifiableMap(containerMap); + } finally { + lock.readLock().unlock(); } } @@ -277,7 +296,8 @@ public class ContainerStateMap { public void updateContainerInfo(ContainerInfo info) throws SCMException { Preconditions.checkNotNull(info); ContainerInfo currentInfo = null; - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.writeLock().lock(); + try { currentInfo = containerMap.get( ContainerID.valueof(info.getContainerID())); @@ -285,6 +305,8 @@ public class ContainerStateMap { throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER); } containerMap.put(info.containerID(), info); + } finally { + lock.writeLock().unlock(); } } @@ -304,51 +326,56 @@ public class ContainerStateMap { ContainerID id = new ContainerID(info.getContainerID()); ContainerInfo currentInfo = null; - try (AutoCloseableLock lock = autoLock.acquire()) { - currentInfo = containerMap.get(id); + lock.writeLock().lock(); + try { + try { + currentInfo = containerMap.get(id); - if (currentInfo == null) { - throw new - SCMException("No such container.", FAILED_TO_FIND_CONTAINER); + if (currentInfo == null) { + throw new + SCMException("No such container.", FAILED_TO_FIND_CONTAINER); + } + // We are updating two places before this update is done, these can + // fail independently, since the code needs to handle it. + + // We update the attribute map, if that fails it will throw an + // exception, so no issues, if we are successful, we keep track of the + // fact that we have updated the lifecycle state in the map, and update + // the container state. If this second update fails, we will attempt to + // roll back the earlier change we did. If the rollback fails, we can + // be in an inconsistent state, + + info.setState(newState); + containerMap.put(id, info); + lifeCycleStateMap.update(currentState, newState, id); + LOG.trace("Updated the container {} to new state. Old = {}, new = " + + "{}", id, currentState, newState); + } catch (SCMException ex) { + LOG.error("Unable to update the container state. {}", ex); + // we need to revert the change in this attribute since we are not + // able to update the hash table. + LOG.info("Reverting the update to lifecycle state. Moving back to " + + "old state. Old = {}, Attempted state = {}", currentState, + newState); + + containerMap.put(id, currentInfo); + + // if this line throws, the state map can be in an inconsistent + // state, since we will have modified the attribute by the + // container state will not in sync since we were not able to put + // that into the hash table. + lifeCycleStateMap.update(newState, currentState, id); + + throw new SCMException("Updating the container map failed.", ex, + FAILED_TO_CHANGE_CONTAINER_STATE); } - // We are updating two places before this update is done, these can - // fail independently, since the code needs to handle it. - - // We update the attribute map, if that fails it will throw an exception, - // so no issues, if we are successful, we keep track of the fact that we - // have updated the lifecycle state in the map, and update the container - // state. If this second update fails, we will attempt to roll back the - // earlier change we did. If the rollback fails, we can be in an - // inconsistent state, - - info.setState(newState); - containerMap.put(id, info); - lifeCycleStateMap.update(currentState, newState, id); - LOG.trace("Updated the container {} to new state. Old = {}, new = " + - "{}", id, currentState, newState); - } catch (SCMException ex) { - LOG.error("Unable to update the container state. {}", ex); - // we need to revert the change in this attribute since we are not - // able to update the hash table. - LOG.info("Reverting the update to lifecycle state. Moving back to " + - "old state. Old = {}, Attempted state = {}", currentState, - newState); - - containerMap.put(id, currentInfo); - - // if this line throws, the state map can be in an inconsistent - // state, since we will have modified the attribute by the - // container state will not in sync since we were not able to put - // that into the hash table. - lifeCycleStateMap.update(newState, currentState, id); - - throw new SCMException("Updating the container map failed.", ex, - FAILED_TO_CHANGE_CONTAINER_STATE); - } - // In case the container is set to closed state, it needs to be removed from - // the pipeline Map. - if (!info.isContainerOpen()) { - openPipelineMap.remove(info.getPipelineID(), id); + // In case the container is set to closed state, it needs to be removed + // from the pipeline Map. + if (!info.isContainerOpen()) { + openPipelineMap.remove(info.getPipelineID(), id); + } + } finally { + lock.writeLock().unlock(); } } @@ -360,9 +387,11 @@ public class ContainerStateMap { */ NavigableSet getContainerIDsByOwner(String ownerName) { Preconditions.checkNotNull(ownerName); - - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return ownerMap.getCollection(ownerName); + } finally { + lock.readLock().unlock(); } } @@ -374,9 +403,11 @@ public class ContainerStateMap { */ NavigableSet getContainerIDsByType(ReplicationType type) { Preconditions.checkNotNull(type); - - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return typeMap.getCollection(type); + } finally { + lock.readLock().unlock(); } } @@ -389,9 +420,11 @@ public class ContainerStateMap { public NavigableSet getOpenContainerIDsByPipeline( PipelineID pipelineID) { Preconditions.checkNotNull(pipelineID); - - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return openPipelineMap.getCollection(pipelineID); + } finally { + lock.readLock().unlock(); } } @@ -403,9 +436,11 @@ public class ContainerStateMap { */ NavigableSet getContainerIDsByFactor(ReplicationFactor factor) { Preconditions.checkNotNull(factor); - - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return factorMap.getCollection(factor); + } finally { + lock.readLock().unlock(); } } @@ -415,11 +450,14 @@ public class ContainerStateMap { * @param state - State - Open, Closed etc. * @return List of containers by state. */ - NavigableSet getContainerIDsByState(LifeCycleState state) { + public NavigableSet getContainerIDsByState( + LifeCycleState state) { Preconditions.checkNotNull(state); - - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { return lifeCycleStateMap.getCollection(state); + } finally { + lock.readLock().unlock(); } } @@ -441,7 +479,8 @@ public class ContainerStateMap { Preconditions.checkNotNull(factor, "Factor cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); - try (AutoCloseableLock lock = autoLock.acquire()) { + lock.readLock().lock(); + try { // If we cannot meet any one condition we return EMPTY_SET immediately. // Since when we intersect these sets, the result will be empty if any @@ -479,6 +518,8 @@ public class ContainerStateMap { currentSet = intersectSets(currentSet, sets[x]); } return currentSet; + } finally { + lock.readLock().unlock(); } }