HBASE-27568 ChaosMonkey add support for JournalNodes (#4963)
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
cc54d22fb4
commit
da261344cc
|
@ -31,9 +31,10 @@ interface ClusterManager extends Configurable {
|
||||||
/**
|
/**
|
||||||
* Type of the service daemon
|
* Type of the service daemon
|
||||||
*/
|
*/
|
||||||
public static enum ServiceType {
|
enum ServiceType {
|
||||||
HADOOP_NAMENODE("namenode"),
|
HADOOP_NAMENODE("namenode"),
|
||||||
HADOOP_DATANODE("datanode"),
|
HADOOP_DATANODE("datanode"),
|
||||||
|
HADOOP_JOURNALNODE("journalnode"),
|
||||||
HADOOP_JOBTRACKER("jobtracker"),
|
HADOOP_JOBTRACKER("jobtracker"),
|
||||||
HADOOP_TASKTRACKER("tasktracker"),
|
HADOOP_TASKTRACKER("tasktracker"),
|
||||||
ZOOKEEPER_SERVER("QuorumPeerMain"),
|
ZOOKEEPER_SERVER("QuorumPeerMain"),
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class DistributedHBaseCluster extends HBaseClusterInterface {
|
||||||
* restarted instances of the same server will have different ServerName and will not coincide
|
* restarted instances of the same server will have different ServerName and will not coincide
|
||||||
* with past dead ones. So there's no need to cleanup this list.
|
* with past dead ones. So there's no need to cleanup this list.
|
||||||
*/
|
*/
|
||||||
private Set<ServerName> killedRegionServers = new HashSet<>();
|
private final Set<ServerName> killedRegionServers = new HashSet<>();
|
||||||
|
|
||||||
public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
|
public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -237,6 +237,37 @@ public class DistributedHBaseCluster extends HBaseClusterInterface {
|
||||||
waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
|
waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startJournalNode(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Starting journal node on: {}", serverName.getServerName());
|
||||||
|
clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
|
||||||
|
serverName.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killJournalNode(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Aborting journal node on: {}", serverName.getServerName());
|
||||||
|
clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
|
||||||
|
serverName.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopJournalNode(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Stopping journal node on: {}", serverName.getServerName());
|
||||||
|
clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
|
||||||
|
serverName.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException {
|
||||||
|
waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException {
|
||||||
|
waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
|
private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
|
LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
|
||||||
|
@ -253,7 +284,7 @@ public class DistributedHBaseCluster extends HBaseClusterInterface {
|
||||||
|
|
||||||
private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
|
private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName());
|
LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName());
|
||||||
long start = EnvironmentEdgeManager.currentTime();
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
|
|
||||||
while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
|
while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
|
||||||
|
@ -343,8 +374,7 @@ public class DistributedHBaseCluster extends HBaseClusterInterface {
|
||||||
LOG.info("Restoring cluster - started");
|
LOG.info("Restoring cluster - started");
|
||||||
|
|
||||||
// do a best effort restore
|
// do a best effort restore
|
||||||
boolean success = true;
|
boolean success = restoreMasters(initial, current);
|
||||||
success = restoreMasters(initial, current) && success;
|
|
||||||
success = restoreRegionServers(initial, current) && success;
|
success = restoreRegionServers(initial, current) && success;
|
||||||
success = restoreAdmin() && success;
|
success = restoreAdmin() && success;
|
||||||
|
|
||||||
|
|
|
@ -260,18 +260,32 @@ public abstract class Action {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void killNameNode(ServerName server) throws IOException {
|
protected void killNameNode(ServerName server) throws IOException {
|
||||||
getLogger().info("Killing namenode :-{}", server.getHostname());
|
getLogger().info("Killing namenode {}", server.getHostname());
|
||||||
cluster.killNameNode(server);
|
cluster.killNameNode(server);
|
||||||
cluster.waitForNameNodeToStop(server, killNameNodeTimeout);
|
cluster.waitForNameNodeToStop(server, killNameNodeTimeout);
|
||||||
getLogger().info("Killed namenode:{}. Reported num of rs:{}", server,
|
getLogger().info("Killed namenode {}. Reported num of rs:{}", server,
|
||||||
cluster.getClusterMetrics().getLiveServerMetrics().size());
|
cluster.getClusterMetrics().getLiveServerMetrics().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startNameNode(ServerName server) throws IOException {
|
protected void startNameNode(ServerName server) throws IOException {
|
||||||
getLogger().info("Starting Namenode :-{}", server.getHostname());
|
getLogger().info("Starting namenode {}", server.getHostname());
|
||||||
cluster.startNameNode(server);
|
cluster.startNameNode(server);
|
||||||
cluster.waitForNameNodeToStart(server, startNameNodeTimeout);
|
cluster.waitForNameNodeToStart(server, startNameNodeTimeout);
|
||||||
getLogger().info("Started namenode:{}", server);
|
getLogger().info("Started namenode {}", server);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void killJournalNode(ServerName server) throws IOException {
|
||||||
|
getLogger().info("Killing journalnode {}", server.getHostname());
|
||||||
|
cluster.killJournalNode(server);
|
||||||
|
cluster.waitForJournalNodeToStop(server, killNameNodeTimeout);
|
||||||
|
getLogger().info("Killed journalnode {}", server);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startJournalNode(ServerName server) throws IOException {
|
||||||
|
getLogger().info("Starting journalnode {}", server.getHostname());
|
||||||
|
cluster.startJournalNode(server);
|
||||||
|
cluster.waitForJournalNodeToStart(server, startNameNodeTimeout);
|
||||||
|
getLogger().info("Started journalnode {}", server);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void unbalanceRegions(ClusterMetrics clusterStatus, List<ServerName> fromServers,
|
protected void unbalanceRegions(ClusterMetrics clusterStatus, List<ServerName> fromServers,
|
||||||
|
|
|
@ -122,4 +122,17 @@ public abstract class RestartActionBaseAction extends Action {
|
||||||
getLogger().info("Starting name node: {}", server);
|
getLogger().info("Starting name node: {}", server);
|
||||||
startNameNode(server);
|
startNameNode(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void restartJournalNode(ServerName server, long sleepTime) throws IOException {
|
||||||
|
sleepTime = Math.max(sleepTime, 1000);
|
||||||
|
// Don't try the kill if we're stopping
|
||||||
|
if (context.isStopping()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
getLogger().info("Killing journal node: {}", server);
|
||||||
|
killJournalNode(server);
|
||||||
|
sleep(sleepTime);
|
||||||
|
getLogger().info("Starting journal node: {}", server);
|
||||||
|
startJournalNode(server);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.chaos.actions;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.net.Address;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class RestartRandomJournalNodeAction extends RestartActionBaseAction {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RestartRandomJournalNodeAction.class);
|
||||||
|
|
||||||
|
public RestartRandomJournalNodeAction(long sleepTime) {
|
||||||
|
super(sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Logger getLogger() {
|
||||||
|
return LOG;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void perform() throws Exception {
|
||||||
|
getLogger().info("Performing action: Restart random JournalNode");
|
||||||
|
|
||||||
|
final String qjournal;
|
||||||
|
try (final DistributedFileSystem dfs = HdfsActionUtils.createDfs(getConf())) {
|
||||||
|
final Configuration conf = dfs.getConf();
|
||||||
|
final String nameServiceID = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
|
if (!HAUtil.isHAEnabled(conf, nameServiceID)) {
|
||||||
|
getLogger().info("HA for HDFS is not enabled; skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qjournal = conf.get("dfs.namenode.shared.edits.dir");
|
||||||
|
if (StringUtils.isEmpty(qjournal)) {
|
||||||
|
getLogger().info("Empty qjournals!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final ServerName journalNode =
|
||||||
|
PolicyBasedChaosMonkey.selectRandomItem(getJournalNodes(qjournal));
|
||||||
|
restartJournalNode(journalNode, sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ServerName[] getJournalNodes(final String qjournal) {
|
||||||
|
// WARNING: HDFS internals. qjournal looks like this:
|
||||||
|
// qjournal://journalnode-0.example.com:8485;...;journalnode-N.example.com:8485/hk8se
|
||||||
|
// When done, we have an array of journalnodes+ports: e.g.journalnode-0.example.com:8485
|
||||||
|
final String[] journalNodes =
|
||||||
|
qjournal.toLowerCase().replaceAll("qjournal:\\/\\/", "").replaceAll("\\/.*$", "").split(";");
|
||||||
|
return Arrays.stream(journalNodes).map(Address::fromString)
|
||||||
|
.map(addr -> ServerName.valueOf(addr.getHostName(), addr.getPort()))
|
||||||
|
.toArray(ServerName[]::new);
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.chaos.actions.GracefulRollingRestartRsAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartActiveNameNodeAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartActiveNameNodeAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartRandomDataNodeAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartRandomDataNodeAction;
|
||||||
|
import org.apache.hadoop.hbase.chaos.actions.RestartRandomJournalNodeAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartRandomZKNodeAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartRandomZKNodeAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RollingBatchRestartRsAction;
|
import org.apache.hadoop.hbase.chaos.actions.RollingBatchRestartRsAction;
|
||||||
|
@ -59,6 +60,7 @@ public class ServerAndDependenciesKillingMonkeyFactory extends MonkeyFactory {
|
||||||
new ForceBalancerAction(),
|
new ForceBalancerAction(),
|
||||||
new RestartActiveNameNodeAction(60000),
|
new RestartActiveNameNodeAction(60000),
|
||||||
new RestartRandomDataNodeAction(60000),
|
new RestartRandomDataNodeAction(60000),
|
||||||
|
new RestartRandomJournalNodeAction(60000),
|
||||||
new RestartRandomZKNodeAction(60000),
|
new RestartRandomZKNodeAction(60000),
|
||||||
new GracefulRollingRestartRsAction(gracefulRollingRestartTSSLeepTime),
|
new GracefulRollingRestartRsAction(gracefulRollingRestartTSSLeepTime),
|
||||||
new RollingBatchSuspendResumeRsAction(rollingBatchSuspendRSSleepTime,
|
new RollingBatchSuspendResumeRsAction(rollingBatchSuspendRSSleepTime,
|
||||||
|
|
|
@ -262,6 +262,41 @@ public abstract class HBaseClusterInterface implements Closeable, Configurable {
|
||||||
public abstract void waitForNameNodeToStop(ServerName serverName, long timeout)
|
public abstract void waitForNameNodeToStop(ServerName serverName, long timeout)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new journalnode on the given hostname or if this is a mini/local cluster, silently
|
||||||
|
* logs warning message.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void startJournalNode(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kills the journalnode process if this is a distributed cluster, otherwise, this causes master
|
||||||
|
* to exit doing basic clean up only.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void killJournalNode(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the journalnode if this is a distributed cluster, otherwise silently logs warning
|
||||||
|
* message.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void stopJournalNode(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified journalnode to join the cluster
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public abstract void waitForJournalNodeToStart(ServerName serverName, long timeout)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified journalnode to stop
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public abstract void waitForJournalNodeToStop(ServerName serverName, long timeout)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a new master on the given hostname or if this is a mini/local cluster, starts a master
|
* Starts a new master on the given hostname or if this is a mini/local cluster, starts a master
|
||||||
* locally.
|
* locally.
|
||||||
|
|
|
@ -372,6 +372,31 @@ public class SingleProcessHBaseCluster extends HBaseClusterInterface {
|
||||||
LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
|
LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startJournalNode(ServerName serverName) {
|
||||||
|
LOG.warn("Starting journalnodes on mini cluster is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killJournalNode(ServerName serverName) {
|
||||||
|
LOG.warn("Aborting journalnodes on mini cluster is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopJournalNode(ServerName serverName) {
|
||||||
|
LOG.warn("Stopping journalnodes on mini cluster is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForJournalNodeToStart(ServerName serverName, long timeout) {
|
||||||
|
LOG.warn("Waiting for journalnodes to start on mini cluster is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForJournalNodeToStop(ServerName serverName, long timeout) {
|
||||||
|
LOG.warn("Waiting for journalnodes to stop on mini cluster is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startMaster(String hostname, int port) throws IOException {
|
public void startMaster(String hostname, int port) throws IOException {
|
||||||
this.startMaster();
|
this.startMaster();
|
||||||
|
|
Loading…
Reference in New Issue