HDDS-1411. Add unit test to check if SCM correctly sends close commands for containers in closing state after a restart. (#755)
This commit is contained in:
parent
7e1f8d3a1b
commit
59ded7641f
|
@ -400,14 +400,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
new SCMPipelineManager(conf, scmNodeManager, eventQueue);
|
||||
}
|
||||
|
||||
if(configurator.getContainerManager() != null) {
|
||||
if (configurator.getContainerManager() != null) {
|
||||
containerManager = configurator.getContainerManager();
|
||||
} else {
|
||||
containerManager = new SCMContainerManager(
|
||||
conf, scmNodeManager, pipelineManager, eventQueue);
|
||||
}
|
||||
|
||||
if(configurator.getScmBlockManager() != null) {
|
||||
if (configurator.getScmBlockManager() != null) {
|
||||
scmBlockManager = configurator.getScmBlockManager();
|
||||
} else {
|
||||
scmBlockManager = new BlockManagerImpl(conf, this);
|
||||
|
|
|
@ -20,10 +20,17 @@ package org.apache.hadoop.ozone;
|
|||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
|
@ -33,6 +40,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
|
@ -46,33 +54,44 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
|
|||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
|
||||
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.TypedEvent;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.utils.HddsVersionInfo;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Test class that exercises the StorageContainerManager.
|
||||
*/
|
||||
|
@ -80,8 +99,9 @@ public class TestStorageContainerManager {
|
|||
private static XceiverClientManager xceiverClientManager =
|
||||
new XceiverClientManager(
|
||||
new OzoneConfiguration());
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestStorageContainerManager.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestStorageContainerManager.class);
|
||||
|
||||
/**
|
||||
* Set the timeout for every test.
|
||||
*/
|
||||
|
@ -94,6 +114,9 @@ public class TestStorageContainerManager {
|
|||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder folder= new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testRpcPermission() throws Exception {
|
||||
// Test with default configuration
|
||||
|
@ -119,7 +142,7 @@ public class TestStorageContainerManager {
|
|||
|
||||
SCMClientProtocolServer mockClientServer = Mockito.spy(
|
||||
cluster.getStorageContainerManager().getClientProtocolServer());
|
||||
Mockito.when(mockClientServer.getRpcRemoteUsername())
|
||||
when(mockClientServer.getRpcRemoteUsername())
|
||||
.thenReturn(fakeRemoteUsername);
|
||||
|
||||
try {
|
||||
|
@ -405,7 +428,6 @@ public class TestStorageContainerManager {
|
|||
StorageContainerManager.scmInit(conf);
|
||||
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
|
||||
Assert.assertEquals("testClusterId", scmStore.getClusterID());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -482,4 +504,95 @@ public class TestStorageContainerManager {
|
|||
Assert.assertEquals(expectedVersion, actualVersion);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCloseContainerCommandOnRestart() throws Exception {
|
||||
int numKeys = 15;
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS);
|
||||
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
|
||||
100, TimeUnit.MILLISECONDS);
|
||||
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
|
||||
numKeys);
|
||||
|
||||
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setHbProcessorInterval(3000)
|
||||
.setTrace(false)
|
||||
.setNumDatanodes(1)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
|
||||
helper.createKeys(10, 4096);
|
||||
Thread.sleep(5000);
|
||||
|
||||
StorageContainerManager scm = cluster.getStorageContainerManager();
|
||||
List<ContainerInfo> containers = cluster.getStorageContainerManager()
|
||||
.getContainerManager().getContainers();
|
||||
Assert.assertNotNull(containers);
|
||||
ContainerInfo selectedContainer = containers.iterator().next();
|
||||
|
||||
// Stop processing HB
|
||||
scm.getDatanodeProtocolServer().stop();
|
||||
EventPublisher publisher = mock(EventPublisher.class);
|
||||
ReplicationManager replicationManager = scm.getReplicationManager();
|
||||
Field f = replicationManager.getClass().getDeclaredField("eventPublisher");
|
||||
f.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
|
||||
f.set(replicationManager, publisher);
|
||||
|
||||
doNothing().when(publisher).fireEvent(any(TypedEvent.class),
|
||||
any(CommandForDatanode.class));
|
||||
|
||||
scm.getContainerManager().updateContainerState(selectedContainer
|
||||
.containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
cluster.restartStorageContainerManager(true);
|
||||
scm.getReplicationManager().start();
|
||||
Thread.sleep(2000);
|
||||
|
||||
UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
|
||||
.getDatanodeDetails().getUuid();
|
||||
|
||||
CloseContainerCommand closeContainerCommand =
|
||||
new CloseContainerCommand(selectedContainer.getContainerID(),
|
||||
selectedContainer.getPipelineID(), false);
|
||||
|
||||
CommandForDatanode commandForDatanode = new CommandForDatanode(
|
||||
dnUuid, closeContainerCommand);
|
||||
|
||||
verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
|
||||
CloseContainerCommandMatcher(dnUuid, commandForDatanode)));
|
||||
}
|
||||
|
||||
@SuppressWarnings("visibilitymodifier")
|
||||
static class CloseContainerCommandMatcher
|
||||
extends ArgumentMatcher<CommandForDatanode> {
|
||||
|
||||
private final CommandForDatanode cmd;
|
||||
private final UUID uuid;
|
||||
|
||||
CloseContainerCommandMatcher(UUID uuid, CommandForDatanode cmd) {
|
||||
this.uuid = uuid;
|
||||
this.cmd = cmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
CommandForDatanode cmdRight = (CommandForDatanode) argument;
|
||||
CloseContainerCommand left = (CloseContainerCommand) cmd.getCommand();
|
||||
CloseContainerCommand right =
|
||||
(CloseContainerCommand) cmdRight.getCommand();
|
||||
return cmdRight.getDatanodeId().equals(uuid)
|
||||
&& left.getContainerID() == right.getContainerID()
|
||||
&& left.getPipelineID() == right.getPipelineID()
|
||||
&& left.getType() == right.getType()
|
||||
&& left.getProto().equals(right.getProto());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue