diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 270d356a901..cbd1ac243c5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -400,14 +400,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl new SCMPipelineManager(conf, scmNodeManager, eventQueue); } - if(configurator.getContainerManager() != null) { + if (configurator.getContainerManager() != null) { containerManager = configurator.getContainerManager(); } else { containerManager = new SCMContainerManager( conf, scmNodeManager, pipelineManager, eventQueue); } - if(configurator.getScmBlockManager() != null) { + if (configurator.getScmBlockManager() != null) { scmBlockManager = configurator.getScmBlockManager(); } else { scmBlockManager = new BlockManagerImpl(conf, this); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index a0c58db8eb0..e882657cc2b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -20,10 +20,17 @@ package org.apache.hadoop.ozone; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; @@ -33,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -46,33 +54,44 @@ import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.TypedEvent; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.utils.HddsVersionInfo; - import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; +import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Test class that exercises the StorageContainerManager. */ @@ -80,8 +99,9 @@ public class TestStorageContainerManager { private static XceiverClientManager xceiverClientManager = new XceiverClientManager( new OzoneConfiguration()); - private static final Logger LOG = LoggerFactory - .getLogger(TestStorageContainerManager.class); + private static final Logger LOG = LoggerFactory.getLogger( + TestStorageContainerManager.class); + /** * Set the timeout for every test. */ @@ -94,6 +114,9 @@ public class TestStorageContainerManager { @Rule public ExpectedException exception = ExpectedException.none(); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + @Test public void testRpcPermission() throws Exception { // Test with default configuration @@ -119,7 +142,7 @@ public class TestStorageContainerManager { SCMClientProtocolServer mockClientServer = Mockito.spy( cluster.getStorageContainerManager().getClientProtocolServer()); - Mockito.when(mockClientServer.getRpcRemoteUsername()) + when(mockClientServer.getRpcRemoteUsername()) .thenReturn(fakeRemoteUsername); try { @@ -405,7 +428,6 @@ public class TestStorageContainerManager { StorageContainerManager.scmInit(conf); Assert.assertEquals(NodeType.SCM, scmStore.getNodeType()); Assert.assertEquals("testClusterId", scmStore.getClusterID()); - } @Test @@ -482,4 +504,95 @@ public class TestStorageContainerManager { Assert.assertEquals(expectedVersion, actualVersion); } + @Test + @SuppressWarnings("unchecked") + public void testCloseContainerCommandOnRestart() throws Exception { + int numKeys = 15; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS); + conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); + conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, + numKeys); + + MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) + .setHbInterval(1000) + .setHbProcessorInterval(3000) + .setTrace(false) + .setNumDatanodes(1) + .build(); + cluster.waitForClusterToBeReady(); + + TestStorageContainerManagerHelper helper = + new TestStorageContainerManagerHelper(cluster, conf); + + helper.createKeys(10, 4096); + Thread.sleep(5000); + + StorageContainerManager scm = cluster.getStorageContainerManager(); + List containers = cluster.getStorageContainerManager() + .getContainerManager().getContainers(); + Assert.assertNotNull(containers); + ContainerInfo selectedContainer = containers.iterator().next(); + + // Stop processing HB + scm.getDatanodeProtocolServer().stop(); + EventPublisher publisher = mock(EventPublisher.class); + ReplicationManager replicationManager = scm.getReplicationManager(); + Field f = replicationManager.getClass().getDeclaredField("eventPublisher"); + f.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL); + f.set(replicationManager, publisher); + + doNothing().when(publisher).fireEvent(any(TypedEvent.class), + any(CommandForDatanode.class)); + + scm.getContainerManager().updateContainerState(selectedContainer + .containerID(), HddsProtos.LifeCycleEvent.FINALIZE); + cluster.restartStorageContainerManager(true); + scm.getReplicationManager().start(); + Thread.sleep(2000); + + UUID dnUuid = cluster.getHddsDatanodes().iterator().next() + .getDatanodeDetails().getUuid(); + + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(selectedContainer.getContainerID(), + selectedContainer.getPipelineID(), false); + + CommandForDatanode commandForDatanode = new CommandForDatanode( + dnUuid, closeContainerCommand); + + verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new + CloseContainerCommandMatcher(dnUuid, commandForDatanode))); + } + + @SuppressWarnings("visibilitymodifier") + static class CloseContainerCommandMatcher + extends ArgumentMatcher { + + private final CommandForDatanode cmd; + private final UUID uuid; + + CloseContainerCommandMatcher(UUID uuid, CommandForDatanode cmd) { + this.uuid = uuid; + this.cmd = cmd; + } + + @Override + public boolean matches(Object argument) { + CommandForDatanode cmdRight = (CommandForDatanode) argument; + CloseContainerCommand left = (CloseContainerCommand) cmd.getCommand(); + CloseContainerCommand right = + (CloseContainerCommand) cmdRight.getCommand(); + return cmdRight.getDatanodeId().equals(uuid) + && left.getContainerID() == right.getContainerID() + && left.getPipelineID() == right.getPipelineID() + && left.getType() == right.getType() + && left.getProto().equals(right.getProto()); + } + } }