HDDS-1456. Stop the datanode, when any datanode statemachine state is… (#769)

This commit is contained in:
Bharat Viswanadham 2019-04-26 14:25:34 -07:00 committed by GitHub
parent 37582705fa
commit 43b2a4b77b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 121 additions and 36 deletions

View File

@ -56,6 +56,7 @@ import java.security.cert.CertificateException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate; import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString; import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
@ -84,6 +85,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
private HddsDatanodeHttpServer httpServer; private HddsDatanodeHttpServer httpServer;
private boolean printBanner; private boolean printBanner;
private String[] args; private String[] args;
private volatile AtomicBoolean isStopped = new AtomicBoolean(false);
public HddsDatanodeService(boolean printBanner, String[] args) { public HddsDatanodeService(boolean printBanner, String[] args) {
this.printBanner = printBanner; this.printBanner = printBanner;
@ -209,7 +211,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
initializeCertificateClient(conf); initializeCertificateClient(conf);
} }
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf, datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
dnCertClient); dnCertClient, this::terminateDatanode);
try { try {
httpServer = new HddsDatanodeHttpServer(conf); httpServer = new HddsDatanodeHttpServer(conf);
httpServer.start(); httpServer.start();
@ -421,29 +423,37 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
} }
} }
public void terminateDatanode() {
stop();
terminate(1);
}
@Override @Override
public void stop() { public void stop() {
if (plugins != null) { if (!isStopped.get()) {
for (ServicePlugin plugin : plugins) { isStopped.set(true);
if (plugins != null) {
for (ServicePlugin plugin : plugins) {
try {
plugin.stop();
LOG.info("Stopped plug-in {}", plugin);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
}
}
}
if (datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
if (httpServer != null) {
try { try {
plugin.stop(); httpServer.stop();
LOG.info("Stopped plug-in {}", plugin); } catch (Exception e) {
} catch (Throwable t) { LOG.error("Stopping HttpServer is failed.", e);
LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
} }
} }
} }
if (datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
if (httpServer != null) {
try {
httpServer.stop();
} catch (Exception e) {
LOG.error("Stopping HttpServer is failed.", e);
}
}
} }
@Override @Override

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
/**
* Interface which declares a method to stop HddsDatanodeService.
*/
public interface HddsDatanodeStopService {
void stopService();
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto; .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler; .CloseContainerCommandHandler;
@ -84,6 +85,7 @@ public class DatanodeStateMachine implements Closeable {
private JvmPauseMonitor jvmPauseMonitor; private JvmPauseMonitor jvmPauseMonitor;
private CertificateClient dnCertClient; private CertificateClient dnCertClient;
private final HddsDatanodeStopService hddsDatanodeStopService;
/** /**
* Constructs a a datanode state machine. * Constructs a a datanode state machine.
@ -93,7 +95,9 @@ public class DatanodeStateMachine implements Closeable {
* enabled * enabled
*/ */
public DatanodeStateMachine(DatanodeDetails datanodeDetails, public DatanodeStateMachine(DatanodeDetails datanodeDetails,
Configuration conf, CertificateClient certClient) throws IOException { Configuration conf, CertificateClient certClient,
HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
this.hddsDatanodeStopService = hddsDatanodeStopService;
this.conf = conf; this.conf = conf;
this.datanodeDetails = datanodeDetails; this.datanodeDetails = datanodeDetails;
executorService = HadoopExecutors.newCachedThreadPool( executorService = HadoopExecutors.newCachedThreadPool(
@ -195,6 +199,14 @@ public class DatanodeStateMachine implements Closeable {
LOG.error("Unable to finish the execution.", e); LOG.error("Unable to finish the execution.", e);
} }
} }
// If we have got some exception in stateMachine we set the state to
// shutdown to stop the stateMachine thread. Along with this we should
// also stop the datanode.
if (context.getShutdownOnError()) {
LOG.error("DatanodeStateMachine Shutdown due to an critical error");
hddsDatanodeStopService.stopService();
}
} }
/** /**

View File

@ -73,6 +73,7 @@ public class StateContext {
private final Queue<ContainerAction> containerActions; private final Queue<ContainerAction> containerActions;
private final Queue<PipelineAction> pipelineActions; private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;
private boolean shutdownOnError = false;
/** /**
* Starting with a 2 sec heartbeat frequency which will be updated to the * Starting with a 2 sec heartbeat frequency which will be updated to the
@ -152,6 +153,22 @@ public class StateContext {
this.state = state; this.state = state;
} }
/**
* Sets the shutdownOnError. This method needs to be called when we
* set DatanodeState to SHUTDOWN when executing a task of a DatanodeState.
* @param value
*/
private void setShutdownOnError(boolean value) {
this.shutdownOnError = value;
}
/**
* Get shutdownStateMachine.
* @return boolean
*/
public boolean getShutdownOnError() {
return shutdownOnError;
}
/** /**
* Adds the report to report queue. * Adds the report to report queue.
* *
@ -367,6 +384,14 @@ public class StateContext {
} }
this.setState(newState); this.setState(newState);
} }
if (this.state == DatanodeStateMachine.DatanodeStates.SHUTDOWN) {
LOG.error("Critical error occurred in StateMachine, setting " +
"shutDownMachine");
// When some exception occurred, set shutdownStateMachine to true, so
// that we can terminate the datanode.
setShutdownOnError(true);
}
} }
} }

View File

@ -167,8 +167,6 @@ public class VolumeSet {
checkAndSetClusterID(hddsVolume.getClusterID()); checkAndSetClusterID(hddsVolume.getClusterID());
volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
LOG.info("Added Volume : {} to VolumeSet", LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath()); hddsVolume.getHddsRootDir().getPath());
@ -177,6 +175,8 @@ public class VolumeSet {
throw new IOException("Failed to create HDDS storage dir " + throw new IOException("Failed to create HDDS storage dir " +
hddsVolume.getHddsRootDir()); hddsVolume.getHddsRootDir());
} }
volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
} catch (IOException e) { } catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString) HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build(); .failedVolume(true).build();
@ -185,12 +185,14 @@ public class VolumeSet {
} }
} }
checkAllVolumes(); // First checking if we have any volumes, if all volumes are failed the
// volumeMap size will be zero, and we throw Exception.
if (volumeMap.size() == 0) { if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage locations configured"); throw new DiskOutOfSpaceException("No storage locations configured");
} }
checkAllVolumes();
// Ensure volume threads are stopped and scm df is saved during shutdown. // Ensure volume threads are stopped and scm df is saved during shutdown.
shutdownHook = () -> { shutdownHook = () -> {
saveVolumeSetUsed(); saveVolumeSetUsed();

View File

@ -160,7 +160,7 @@ public class TestDatanodeStateMachine {
public void testStartStopDatanodeStateMachine() throws IOException, public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine = try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) { new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
stateMachine.startDaemon(); stateMachine.startDaemon();
SCMConnectionManager connectionManager = SCMConnectionManager connectionManager =
stateMachine.getConnectionManager(); stateMachine.getConnectionManager();
@ -222,7 +222,7 @@ public class TestDatanodeStateMachine {
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath); ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
try (DatanodeStateMachine stateMachine = try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null)) { new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState = DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState(); stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@ -343,7 +343,7 @@ public class TestDatanodeStateMachine {
datanodeDetails.setPort(port); datanodeDetails.setPort(port);
try (DatanodeStateMachine stateMachine = try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null)) { new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState = DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState(); stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@ -406,7 +406,7 @@ public class TestDatanodeStateMachine {
perTestConf.setStrings(entry.getKey(), entry.getValue()); perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue()); LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
getNewDatanodeDetails(), perTestConf, null)) { getNewDatanodeDetails(), perTestConf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState = DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState(); stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,

View File

@ -38,6 +38,8 @@ import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import org.junit.After; import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.junit.Rule; import org.junit.Rule;
@ -125,14 +127,14 @@ public class TestVolumeSetDiskChecks {
} }
/** /**
* Verify that initialization fails if all volumes are bad. * Verify that all volumes are added to fail list if all volumes are bad.
*/ */
@Test @Test
public void testAllVolumesAreBad() throws IOException { public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5; final int numVolumes = 5;
conf = getConfWithDataNodeDirs(numVolumes); conf = getConfWithDataNodeDirs(numVolumes);
thrown.expect(IOException.class);
final VolumeSet volumeSet = new VolumeSet( final VolumeSet volumeSet = new VolumeSet(
UUID.randomUUID().toString(), conf) { UUID.randomUUID().toString(), conf) {
@Override @Override
@ -141,6 +143,9 @@ public class TestVolumeSetDiskChecks {
return new DummyChecker(configuration, new Timer(), numVolumes); return new DummyChecker(configuration, new Timer(), numVolumes);
} }
}; };
assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
assertEquals(volumeSet.getVolumesList().size(), 0);
} }
/** /**

View File

@ -175,6 +175,10 @@ public class TestEndPoint {
@Test @Test
public void testCheckVersionResponse() throws Exception { public void testCheckVersionResponse() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf(); OzoneConfiguration conf = SCMTestUtils.getConf();
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
true);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) { serverAddress, 1000)) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
@ -478,7 +482,7 @@ public class TestEndPoint {
// Create a datanode state machine for stateConext used by endpoint task // Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), conf, null); TestUtils.randomDatanodeDetails(), conf, null, null);
EndpointStateMachine rpcEndPoint = EndpointStateMachine rpcEndPoint =
createEndpoint(conf, scmAddress, rpcTimeout)) { createEndpoint(conf, scmAddress, rpcTimeout)) {
HddsProtos.DatanodeDetailsProto datanodeDetailsProto = HddsProtos.DatanodeDetailsProto datanodeDetailsProto =

View File

@ -175,11 +175,11 @@ public class TestMiniOzoneCluster {
true); true);
try ( try (
DatanodeStateMachine sm1 = new DatanodeStateMachine( DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null); TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine( DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null); TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine( DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null) TestUtils.randomDatanodeDetails(), ozoneConf, null, null)
) { ) {
HashSet<Integer> ports = new HashSet<Integer>(); HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
@ -198,11 +198,11 @@ public class TestMiniOzoneCluster {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try ( try (
DatanodeStateMachine sm1 = new DatanodeStateMachine( DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null); TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine( DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null); TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine( DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null) TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
) { ) {
HashSet<Integer> ports = new HashSet<Integer>(); HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));