diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 953f71eaecd..6825ca42bf1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdds.client.BlockID; @@ -87,10 +88,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { * @param conf - configuration. * @param nodeManager - node manager. * @param containerManager - container manager. + * @param eventPublisher - event publisher. * @throws IOException */ public BlockManagerImpl(final Configuration conf, - final NodeManager nodeManager, final Mapping containerManager) + final NodeManager nodeManager, final Mapping containerManager, + EventPublisher eventPublisher) throws IOException { this.nodeManager = nodeManager; this.containerManager = containerManager; @@ -120,9 +123,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); blockDeletingService = - new SCMBlockDeletingService( - deletedBlockLog, containerManager, nodeManager, svcInterval, - serviceTimeout, conf); + new SCMBlockDeletingService(deletedBlockLog, containerManager, + nodeManager, eventPublisher, svcInterval, serviceTimeout, conf); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 2c555e04212..6f65fdd2c92 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -20,11 +20,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; @@ -61,6 +64,7 @@ public class SCMBlockDeletingService extends BackgroundService { private final DeletedBlockLog deletedBlockLog; private final Mapping mappingService; private final NodeManager nodeManager; + private final EventPublisher eventPublisher; // Block delete limit size is dynamically calculated based on container // delete limit size (ozone.block.deleting.container.limit.per.interval) @@ -76,13 +80,14 @@ public class SCMBlockDeletingService extends BackgroundService { private int blockDeleteLimitSize; public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, - Mapping mapper, NodeManager nodeManager, - long interval, long serviceTimeout, Configuration conf) { + Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher, + long interval, long serviceTimeout, Configuration conf) { super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; this.mappingService = mapper; this.nodeManager = nodeManager; + this.eventPublisher = eventPublisher; int containerLimit = conf.getInt( OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, @@ -145,8 +150,8 @@ public class SCMBlockDeletingService extends BackgroundService { // We should stop caching new commands if num of un-processed // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. - nodeManager.addDatanodeCommand(dnId, - new DeleteBlocksCommand(dnTXs)); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs))); LOG.debug( "Added delete block command for datanode {} in the queue," + " number of delete block transactions: {}, TxID list: {}", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 5f511eee31c..f37a0ed6ff8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -181,7 +181,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl scmContainerManager = new ContainerMapping( conf, getScmNodeManager(), cacheSize); scmBlockManager = new BlockManagerImpl( - conf, getScmNodeManager(), scmContainerManager); + conf, getScmNodeManager(), scmContainerManager, eventQueue); Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 9fbb9fa5f55..06e74206cdc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -74,7 +74,7 @@ public class TestBlockManager { } nodeManager = new MockNodeManager(true, 10); mapping = new ContainerMapping(conf, nodeManager, 128); - blockManager = new BlockManagerImpl(conf, nodeManager, mapping); + blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null); if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){ factor = HddsProtos.ReplicationFactor.THREE; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java index 1a1f37ca375..a878627aa01 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.ozone.scm; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -117,7 +116,7 @@ public class TestContainerSQLCli { nodeManager = cluster.getStorageContainerManager().getScmNodeManager(); mapping = new ContainerMapping(conf, nodeManager, 128); - blockManager = new BlockManagerImpl(conf, nodeManager, mapping); + blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null); // blockManager.allocateBlock() will create containers if there is none // stored in levelDB. The number of containers to create is the value of