HDDS-1285. Implement actions need to be taken after chill mode exit wait time. (#612)
This commit is contained in:
parent
abace709cc
commit
a458c5bd96
|
@ -21,11 +21,17 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.block.BlockManager;
|
||||
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
|
||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -36,12 +42,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
*/
|
||||
public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ChillModeHandler.class);
|
||||
|
||||
private final SCMClientProtocolServer scmClientProtocolServer;
|
||||
private final BlockManager scmBlockManager;
|
||||
private final long waitTime;
|
||||
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
|
||||
private final ReplicationManager replicationManager;
|
||||
|
||||
private final PipelineManager scmPipelineManager;
|
||||
|
||||
/**
|
||||
* ChillModeHandler, to handle the logic once we exit chill mode.
|
||||
|
@ -53,13 +63,15 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
|||
public ChillModeHandler(Configuration configuration,
|
||||
SCMClientProtocolServer clientProtocolServer,
|
||||
BlockManager blockManager,
|
||||
ReplicationManager replicationManager) {
|
||||
ReplicationManager replicationManager, PipelineManager pipelineManager) {
|
||||
Objects.requireNonNull(configuration, "Configuration cannot be null");
|
||||
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
|
||||
"object cannot be null");
|
||||
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
|
||||
Objects.requireNonNull(replicationManager, "ReplicationManager " +
|
||||
"object cannot be null");
|
||||
Objects.requireNonNull(pipelineManager, "PipelineManager object cannot " +
|
||||
"be" + "null");
|
||||
this.waitTime = configuration.getTimeDuration(
|
||||
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
|
||||
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
|
||||
|
@ -67,6 +79,7 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
|||
this.scmClientProtocolServer = clientProtocolServer;
|
||||
this.scmBlockManager = blockManager;
|
||||
this.replicationManager = replicationManager;
|
||||
this.scmPipelineManager = pipelineManager;
|
||||
|
||||
final boolean chillModeEnabled = configuration.getBoolean(
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
|
||||
|
@ -75,6 +88,8 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Set ChillMode status based on
|
||||
* {@link org.apache.hadoop.hdds.scm.events.SCMEvents#CHILL_MODE_STATUS}.
|
||||
|
@ -101,6 +116,7 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
replicationManager.start();
|
||||
cleanupPipelines();
|
||||
});
|
||||
|
||||
chillModeExitThread.setDaemon(true);
|
||||
|
@ -109,6 +125,20 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
|
|||
|
||||
}
|
||||
|
||||
private void cleanupPipelines() {
|
||||
List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
|
||||
pipelineList.forEach((pipeline) -> {
|
||||
try {
|
||||
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
|
||||
scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Finalize and destroy pipeline failed for pipeline "
|
||||
+ pipeline.toString(), ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean getChillModeStatus() {
|
||||
return isInChillMode.get();
|
||||
}
|
||||
|
|
|
@ -335,7 +335,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
httpServer = new StorageContainerManagerHttpServer(conf);
|
||||
|
||||
chillModeHandler = new ChillModeHandler(configuration,
|
||||
clientProtocolServer, scmBlockManager, replicationManager);
|
||||
clientProtocolServer, scmBlockManager, replicationManager,
|
||||
pipelineManager);
|
||||
|
||||
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
|
||||
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
|
||||
|
@ -1077,6 +1078,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
return chillModeHandler;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SCMChillModeManager getScmChillModeManager() {
|
||||
return scmChillModeManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return replicationManager;
|
||||
}
|
||||
|
||||
public void checkAdminAccess(String remoteUser) throws IOException {
|
||||
if (remoteUser != null) {
|
||||
if (!scmAdminUsernames.contains(remoteUser)) {
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
|||
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
||||
.ContainerPlacementPolicy;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -49,6 +51,7 @@ public class TestChillModeHandler {
|
|||
private ChillModeHandler chillModeHandler;
|
||||
private EventQueue eventQueue;
|
||||
private SCMChillModeManager.ChillModeStatus chillModeStatus;
|
||||
private PipelineManager scmPipelineManager;
|
||||
|
||||
public void setup(boolean enabled) {
|
||||
configuration = new OzoneConfiguration();
|
||||
|
@ -66,10 +69,11 @@ public class TestChillModeHandler {
|
|||
replicationManager = new ReplicationManager(configuration,
|
||||
containerManager, Mockito.mock(ContainerPlacementPolicy.class),
|
||||
eventQueue);
|
||||
scmPipelineManager = Mockito.mock(SCMPipelineManager.class);
|
||||
blockManager = Mockito.mock(BlockManagerImpl.class);
|
||||
chillModeHandler =
|
||||
new ChillModeHandler(configuration, scmClientProtocolServer,
|
||||
blockManager, replicationManager);
|
||||
blockManager, replicationManager, scmPipelineManager);
|
||||
|
||||
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
|
||||
chillModeStatus = new SCMChillModeManager.ChillModeStatus(false);
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
|
|||
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -50,8 +52,10 @@ public class TestSCMClientProtocolServer {
|
|||
BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
|
||||
ReplicationManager replicationManager =
|
||||
Mockito.mock(ReplicationManager.class);
|
||||
PipelineManager pipelineManager = Mockito.mock(SCMPipelineManager.class);
|
||||
ChillModeHandler chillModeHandler = new ChillModeHandler(config,
|
||||
scmClientProtocolServer, blockManager, replicationManager);
|
||||
scmClientProtocolServer, blockManager, replicationManager,
|
||||
pipelineManager);
|
||||
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.chillmode;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* This class tests SCM Chill mode with pipeline rules.
|
||||
*/
|
||||
|
||||
public class TestSCMChillModeWithPipelineRules {
|
||||
|
||||
private static MiniOzoneCluster cluster;
|
||||
private OzoneConfiguration conf = new OzoneConfiguration();
|
||||
private PipelineManager pipelineManager;
|
||||
private MiniOzoneCluster.Builder clusterBuilder;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
public void setup(int numDatanodes) throws Exception {
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
|
||||
temporaryFolder.newFolder().toString());
|
||||
conf.setBoolean(
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
|
||||
true);
|
||||
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "10s");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
|
||||
clusterBuilder = MiniOzoneCluster.newBuilder(conf)
|
||||
.setNumDatanodes(numDatanodes)
|
||||
.setHbInterval(1000)
|
||||
.setHbProcessorInterval(1000);
|
||||
|
||||
cluster = clusterBuilder.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
StorageContainerManager scm = cluster.getStorageContainerManager();
|
||||
pipelineManager = scm.getPipelineManager();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testScmChillMode() throws Exception {
|
||||
|
||||
int datanodeCount = 6;
|
||||
setup(datanodeCount);
|
||||
|
||||
waitForRatis3NodePipelines(datanodeCount/3);
|
||||
waitForRatis1NodePipelines(datanodeCount);
|
||||
|
||||
int totalPipelineCount = datanodeCount + (datanodeCount/3);
|
||||
|
||||
//Cluster is started successfully
|
||||
cluster.stop();
|
||||
|
||||
cluster.restartOzoneManager();
|
||||
cluster.restartStorageContainerManager(false);
|
||||
|
||||
pipelineManager = cluster.getStorageContainerManager().getPipelineManager();
|
||||
List<Pipeline> pipelineList =
|
||||
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE);
|
||||
|
||||
|
||||
pipelineList.get(0).getNodes().forEach(datanodeDetails -> {
|
||||
try {
|
||||
cluster.restartHddsDatanode(datanodeDetails, false);
|
||||
} catch (Exception ex) {
|
||||
fail("Datanode restart failed");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
SCMChillModeManager scmChillModeManager =
|
||||
cluster.getStorageContainerManager().getScmChillModeManager();
|
||||
|
||||
|
||||
// Ceil(0.1 * 2) is 1, as one pipeline is healthy healthy pipeline rule is
|
||||
// satisfied
|
||||
|
||||
GenericTestUtils.waitFor(() ->
|
||||
scmChillModeManager.getHealthyPipelineChillModeRule()
|
||||
.validate(), 1000, 60000);
|
||||
|
||||
// As Ceil(0.9 * 2) is 2, and from second pipeline no datanodes's are
|
||||
// reported this rule is not met yet.
|
||||
GenericTestUtils.waitFor(() ->
|
||||
!scmChillModeManager.getOneReplicaPipelineChillModeRule()
|
||||
.validate(), 1000, 60000);
|
||||
|
||||
Assert.assertTrue(cluster.getStorageContainerManager().isInChillMode());
|
||||
|
||||
DatanodeDetails restartedDatanode = pipelineList.get(1).getFirstNode();
|
||||
// Now restart one datanode from the 2nd pipeline
|
||||
try {
|
||||
cluster.restartHddsDatanode(restartedDatanode, false);
|
||||
} catch (Exception ex) {
|
||||
fail("Datanode restart failed");
|
||||
}
|
||||
|
||||
|
||||
GenericTestUtils.waitFor(() ->
|
||||
scmChillModeManager.getOneReplicaPipelineChillModeRule()
|
||||
.validate(), 1000, 60000);
|
||||
|
||||
GenericTestUtils.waitFor(() -> !scmChillModeManager.getInChillMode(), 1000,
|
||||
60000);
|
||||
|
||||
// As after chillmode wait time is not completed, we should have total
|
||||
// pipeline's as original count 6(1 node pipelines) + 2 (3 node pipeline)
|
||||
Assert.assertEquals(totalPipelineCount,
|
||||
pipelineManager.getPipelines().size());
|
||||
|
||||
ReplicationManager replicationManager =
|
||||
cluster.getStorageContainerManager().getReplicationManager();
|
||||
|
||||
GenericTestUtils.waitFor(() ->
|
||||
replicationManager.isRunning(), 1000, 60000);
|
||||
|
||||
|
||||
// As 4 datanodes are reported, 4 single node pipeline and 1 3 node
|
||||
// pipeline.
|
||||
|
||||
waitForRatis1NodePipelines(4);
|
||||
waitForRatis3NodePipelines(1);
|
||||
|
||||
// Restart other datanodes in the pipeline, and after some time we should
|
||||
// have same count as original.
|
||||
pipelineList.get(1).getNodes().forEach(datanodeDetails -> {
|
||||
try {
|
||||
if (!restartedDatanode.equals(datanodeDetails)) {
|
||||
cluster.restartHddsDatanode(datanodeDetails, false);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
fail("Datanode restart failed");
|
||||
}
|
||||
});
|
||||
|
||||
waitForRatis1NodePipelines(datanodeCount);
|
||||
waitForRatis3NodePipelines(datanodeCount/3);
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void waitForRatis3NodePipelines(int numPipelines)
|
||||
throws TimeoutException, InterruptedException {
|
||||
GenericTestUtils.waitFor(() -> pipelineManager
|
||||
.getPipelines(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
|
||||
.size() == numPipelines, 100, 60000);
|
||||
}
|
||||
|
||||
private void waitForRatis1NodePipelines(int numPipelines)
|
||||
throws TimeoutException, InterruptedException {
|
||||
GenericTestUtils.waitFor(() -> pipelineManager
|
||||
.getPipelines(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.ONE, Pipeline.PipelineState.OPEN)
|
||||
.size() == numPipelines, 100, 60000);
|
||||
}
|
||||
}
|
|
@ -140,7 +140,7 @@ public class TestContainerStateManagerIntegration {
|
|||
}
|
||||
}
|
||||
|
||||
cluster.restartStorageContainerManager();
|
||||
cluster.restartStorageContainerManager(true);
|
||||
|
||||
List<ContainerInfo> result = cluster.getStorageContainerManager()
|
||||
.getContainerManager().listContainer(null, 100);
|
||||
|
|
|
@ -75,7 +75,7 @@ public class TestSCMRestart {
|
|||
// At this stage, there should be 2 pipeline one with 1 open container
|
||||
// each. Try restarting the SCM and then discover that pipeline are in
|
||||
// correct state.
|
||||
cluster.restartStorageContainerManager();
|
||||
cluster.restartStorageContainerManager(true);
|
||||
newContainerManager = cluster.getStorageContainerManager()
|
||||
.getContainerManager();
|
||||
pipelineManager = cluster.getStorageContainerManager().getPipelineManager();
|
||||
|
|
|
@ -159,12 +159,14 @@ public interface MiniOzoneCluster {
|
|||
/**
|
||||
* Restarts StorageContainerManager instance.
|
||||
*
|
||||
* @param waitForDatanode
|
||||
* @throws IOException
|
||||
* @throws TimeoutException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void restartStorageContainerManager() throws InterruptedException,
|
||||
TimeoutException, IOException, AuthenticationException;
|
||||
void restartStorageContainerManager(boolean waitForDatanode)
|
||||
throws InterruptedException, TimeoutException, IOException,
|
||||
AuthenticationException;
|
||||
|
||||
/**
|
||||
* Restarts OzoneManager instance.
|
||||
|
|
|
@ -250,14 +250,16 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restartStorageContainerManager()
|
||||
public void restartStorageContainerManager(boolean waitForDatanode)
|
||||
throws TimeoutException, InterruptedException, IOException,
|
||||
AuthenticationException {
|
||||
scm.stop();
|
||||
scm.join();
|
||||
scm = StorageContainerManager.createSCM(null, conf);
|
||||
scm.start();
|
||||
waitForClusterToBeReady();
|
||||
if (waitForDatanode) {
|
||||
waitForClusterToBeReady();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -260,7 +260,7 @@ public class TestMiniOzoneCluster {
|
|||
|
||||
// DN should successfully register with the SCM after SCM is restarted.
|
||||
// Restart the SCM
|
||||
cluster.restartStorageContainerManager();
|
||||
cluster.restartStorageContainerManager(true);
|
||||
// Wait for DN to register
|
||||
cluster.waitForClusterToBeReady();
|
||||
// DN should be in HEARTBEAT state after registering with the SCM
|
||||
|
|
|
@ -342,7 +342,7 @@ public class TestScmChillMode {
|
|||
assertFalse(scm.isInChillMode());
|
||||
|
||||
// Even on SCM restart, cluster should be out of chill mode immediately.
|
||||
cluster.restartStorageContainerManager();
|
||||
cluster.restartStorageContainerManager(true);
|
||||
assertFalse(scm.isInChillMode());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue