HBASE-24260 Add a ClusterManager that issues commands via coprocessor
Implements `ClusterManager` that relies on the new `ShellExecEndpointCoprocessor` for remote shell command execution. Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
7313b4f5aa
commit
e37aafcfc2
|
@ -23,7 +23,6 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
|
|
|
@ -256,7 +256,17 @@
|
|||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>compile</scope>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Overrides commands to make use of coprocessor where possible. Only supports actions taken
|
||||
* against Master and Region Server hosts.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("unused") // no way to test this without a distributed cluster.
|
||||
public class CoprocClusterManager extends HBaseClusterManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class);
|
||||
private static final Set<ServiceType> supportedServices = buildSupportedServicesSet();
|
||||
|
||||
@Override
|
||||
protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
|
||||
throws IOException {
|
||||
if (!supportedServices.contains(service)) {
|
||||
throw unsupportedServiceType(service);
|
||||
}
|
||||
|
||||
// We only support actions vs. Master or Region Server processes. We're issuing those actions
|
||||
// via the coprocessor that's running within those processes. Thus, there's no support for
|
||||
// honoring the configured service user.
|
||||
final String command = StringUtils.join(cmd, " ");
|
||||
LOG.info("Executing remote command: {}, hostname:{}", command, hostname);
|
||||
|
||||
try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) {
|
||||
final AsyncAdmin admin = conn.getAdmin();
|
||||
final ShellExecRequest req = ShellExecRequest.newBuilder()
|
||||
.setCommand(command)
|
||||
.setAwaitResponse(false)
|
||||
.build();
|
||||
|
||||
final ShellExecResponse resp;
|
||||
switch(service) {
|
||||
case HBASE_MASTER:
|
||||
// What happens if the intended action was killing a backup master? Right now we have
|
||||
// no `RestartBackupMasterAction` so it's probably fine.
|
||||
resp = masterExec(admin, req);
|
||||
break;
|
||||
case HBASE_REGIONSERVER:
|
||||
final ServerName targetHost = resolveRegionServerName(admin, hostname);
|
||||
resp = regionServerExec(admin, req, targetHost);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("should not happen");
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, resp.getExitCode(),
|
||||
resp.getStdout());
|
||||
} else {
|
||||
LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode());
|
||||
}
|
||||
return new Pair<>(resp.getExitCode(), resp.getStdout());
|
||||
}
|
||||
}
|
||||
|
||||
private static Set<ServiceType> buildSupportedServicesSet() {
|
||||
final Set<ServiceType> set = new HashSet<>();
|
||||
set.add(ServiceType.HBASE_MASTER);
|
||||
set.add(ServiceType.HBASE_REGIONSERVER);
|
||||
return Collections.unmodifiableSet(set);
|
||||
}
|
||||
|
||||
private static ShellExecResponse masterExec(final AsyncAdmin admin,
|
||||
final ShellExecRequest req) {
|
||||
// TODO: Admin API provides no means of sending exec to a backup master.
|
||||
return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
|
||||
ShellExecService::newStub,
|
||||
(stub, controller, callback) -> stub.shellExec(controller, req, callback))
|
||||
.join();
|
||||
}
|
||||
|
||||
private static ShellExecResponse regionServerExec(final AsyncAdmin admin,
|
||||
final ShellExecRequest req, final ServerName targetHost) {
|
||||
return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
|
||||
ShellExecService::newStub,
|
||||
(stub, controller, callback) -> stub.shellExec(controller, req, callback),
|
||||
targetHost)
|
||||
.join();
|
||||
}
|
||||
|
||||
private static ServerName resolveRegionServerName(final AsyncAdmin admin,
|
||||
final String hostname) {
|
||||
return admin.getRegionServers()
|
||||
.thenApply(names -> names.stream()
|
||||
.filter(sn -> Objects.equals(sn.getHostname(), hostname))
|
||||
.findAny())
|
||||
.join()
|
||||
.orElseThrow(() -> serverNotFound(hostname));
|
||||
}
|
||||
|
||||
private static RuntimeException serverNotFound(final String hostname) {
|
||||
return new RuntimeException(
|
||||
String.format("Did not find %s amongst the servers known to the client.", hostname));
|
||||
}
|
||||
|
||||
private static RuntimeException unsupportedServiceType(final ServiceType serviceType) {
|
||||
return new RuntimeException(
|
||||
String.format("Unable to service request for service=%s", serviceType));
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
@ -45,9 +44,12 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HBaseClusterManager extends Configured implements ClusterManager {
|
||||
private static final String SIGKILL = "SIGKILL";
|
||||
private static final String SIGSTOP = "SIGSTOP";
|
||||
private static final String SIGCONT = "SIGCONT";
|
||||
|
||||
protected enum Signal {
|
||||
SIGKILL,
|
||||
SIGSTOP,
|
||||
SIGCONT,
|
||||
}
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
|
||||
private String sshUserName;
|
||||
|
@ -107,7 +109,7 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
|
|||
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
|
||||
}
|
||||
|
||||
private String getServiceUser(ServiceType service) {
|
||||
protected String getServiceUser(ServiceType service) {
|
||||
Configuration conf = getConf();
|
||||
switch (service) {
|
||||
case HADOOP_DATANODE:
|
||||
|
@ -329,9 +331,9 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
|
|||
* @return pair of exit code and command output
|
||||
* @throws IOException if something goes wrong.
|
||||
*/
|
||||
private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
|
||||
protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
|
||||
throws IOException {
|
||||
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
|
||||
LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "),
|
||||
hostname);
|
||||
|
||||
RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
|
||||
|
@ -444,8 +446,9 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
|
|||
exec(hostname, service, Operation.RESTART);
|
||||
}
|
||||
|
||||
public void signal(ServiceType service, String signal, String hostname) throws IOException {
|
||||
execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
|
||||
public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
|
||||
execWithRetries(hostname, service,
|
||||
getCommandProvider(service).signalCommand(service, signal.toString()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -457,16 +460,16 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
|
|||
|
||||
@Override
|
||||
public void kill(ServiceType service, String hostname, int port) throws IOException {
|
||||
signal(service, SIGKILL, hostname);
|
||||
signal(service, Signal.SIGKILL, hostname);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspend(ServiceType service, String hostname, int port) throws IOException {
|
||||
signal(service, SIGSTOP, hostname);
|
||||
signal(service, Signal.SIGSTOP, hostname);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume(ServiceType service, String hostname, int port) throws IOException {
|
||||
signal(service, SIGCONT, hostname);
|
||||
signal(service, Signal.SIGCONT, hostname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,6 +147,7 @@ public class IntegrationTestingUtility extends HBaseTestingUtility {
|
|||
HConstants.HBASE_DIR, conf.get(HConstants.HBASE_DIR));
|
||||
Class<? extends ClusterManager> clusterManagerClass = conf.getClass(HBASE_CLUSTER_MANAGER_CLASS,
|
||||
DEFAULT_HBASE_CLUSTER_MANAGER_CLASS, ClusterManager.class);
|
||||
LOG.info("Instantiating {}", clusterManagerClass.getName());
|
||||
ClusterManager clusterManager = ReflectionUtils.newInstance(
|
||||
clusterManagerClass, conf);
|
||||
setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager));
|
||||
|
|
|
@ -74,6 +74,8 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
"hbase.it.clustermanager.restapi.password";
|
||||
private static final String REST_API_CLUSTER_MANAGER_CLUSTER_NAME =
|
||||
"hbase.it.clustermanager.restapi.clustername";
|
||||
private static final String REST_API_DELEGATE_CLUSTER_MANAGER =
|
||||
"hbase.it.clustermanager.restapi.delegate";
|
||||
|
||||
private static final JsonParser parser = new JsonParser();
|
||||
|
||||
|
@ -86,8 +88,6 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
// Fields for the hostname, username, password, and cluster name of the cluster management server
|
||||
// to be used.
|
||||
private String serverHostname;
|
||||
private String serverUsername;
|
||||
private String serverPassword;
|
||||
private String clusterName;
|
||||
|
||||
// Each version of Cloudera Manager supports a particular API versions. Version 6 of this API
|
||||
|
@ -103,10 +103,7 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class);
|
||||
|
||||
RESTApiClusterManager() {
|
||||
hBaseClusterManager = ReflectionUtils.newInstance(HBaseClusterManager.class,
|
||||
new IntegrationTestingUtility().getConfiguration());
|
||||
}
|
||||
RESTApiClusterManager() { }
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
|
@ -115,12 +112,17 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
// `Configured()` constructor calls `setConf(null)` before calling again with a real value.
|
||||
return;
|
||||
}
|
||||
|
||||
final Class<? extends ClusterManager> clazz = conf.getClass(REST_API_DELEGATE_CLUSTER_MANAGER,
|
||||
HBaseClusterManager.class, ClusterManager.class);
|
||||
hBaseClusterManager = ReflectionUtils.newInstance(clazz, conf);
|
||||
|
||||
serverHostname = conf.get(REST_API_CLUSTER_MANAGER_HOSTNAME, DEFAULT_SERVER_HOSTNAME);
|
||||
serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
|
||||
serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
|
||||
clusterName = conf.get(REST_API_CLUSTER_MANAGER_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
|
||||
|
||||
// Add filter to Client instance to enable server authentication.
|
||||
String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
|
||||
String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
|
||||
client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* Receives shell commands from the client and executes them blindly. Intended only for use
|
||||
* by {@link ChaosMonkey} via {@link CoprocClusterManager}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ShellExecEndpointCoprocessor extends ShellExecEndpoint.ShellExecService implements
|
||||
MasterCoprocessor, RegionServerCoprocessor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ShellExecEndpointCoprocessor.class);
|
||||
|
||||
public static final String BACKGROUND_DELAY_MS_KEY = "hbase.it.shellexeccoproc.async.delay.ms";
|
||||
public static final long DEFAULT_BACKGROUND_DELAY_MS = 1_000;
|
||||
|
||||
private final ExecutorService backgroundExecutor;
|
||||
private Configuration conf;
|
||||
|
||||
public ShellExecEndpointCoprocessor() {
|
||||
backgroundExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat(ShellExecEndpointCoprocessor.class.getSimpleName() + "-{}")
|
||||
.setDaemon(true)
|
||||
.setUncaughtExceptionHandler((t, e) -> LOG.warn("Thread {} threw", t, e))
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Service> getServices() {
|
||||
return Collections.singletonList(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) {
|
||||
conf = env.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shellExec(
|
||||
final RpcController controller,
|
||||
final ShellExecRequest request,
|
||||
final RpcCallback<ShellExecResponse> done
|
||||
) {
|
||||
final String command = request.getCommand();
|
||||
if (StringUtils.isBlank(command)) {
|
||||
throw new RuntimeException("Request contained an empty command.");
|
||||
}
|
||||
final boolean awaitResponse = !request.hasAwaitResponse() || request.getAwaitResponse();
|
||||
final String[] subShellCmd = new String[] { "/usr/bin/env", "bash", "-c", command };
|
||||
final Shell.ShellCommandExecutor shell = new Shell.ShellCommandExecutor(subShellCmd);
|
||||
|
||||
final String msgFmt = "Executing command"
|
||||
+ (!awaitResponse ? " on a background thread" : "") + ": {}";
|
||||
LOG.info(msgFmt, command);
|
||||
|
||||
if (awaitResponse) {
|
||||
runForegroundTask(shell, controller, done);
|
||||
} else {
|
||||
runBackgroundTask(shell, done);
|
||||
}
|
||||
}
|
||||
|
||||
private void runForegroundTask(
|
||||
final Shell.ShellCommandExecutor shell,
|
||||
final RpcController controller,
|
||||
final RpcCallback<ShellExecResponse> done
|
||||
) {
|
||||
ShellExecResponse.Builder builder = ShellExecResponse.newBuilder();
|
||||
try {
|
||||
doExec(shell, builder);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failure launching process", e);
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
done.run(builder.build());
|
||||
}
|
||||
|
||||
private void runBackgroundTask(
|
||||
final Shell.ShellCommandExecutor shell,
|
||||
final RpcCallback<ShellExecResponse> done
|
||||
) {
|
||||
final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS);
|
||||
backgroundExecutor.submit(() -> {
|
||||
try {
|
||||
// sleep first so that the RPC can ACK. race condition here as we have no means of blocking
|
||||
// until the IPC response has been acknowledged by the client.
|
||||
Thread.sleep(sleepDuration);
|
||||
doExec(shell, ShellExecResponse.newBuilder());
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted before launching process.", e);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failure launching process", e);
|
||||
}
|
||||
});
|
||||
done.run(ShellExecResponse.newBuilder().build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute {@code shell} and collect results into {@code builder} as side-effects.
|
||||
*/
|
||||
private void doExec(
|
||||
final Shell.ShellCommandExecutor shell,
|
||||
final ShellExecResponse.Builder builder
|
||||
) throws IOException {
|
||||
try {
|
||||
shell.execute();
|
||||
builder
|
||||
.setExitCode(shell.getExitCode())
|
||||
.setStdout(shell.getOutput());
|
||||
} catch (Shell.ExitCodeException e) {
|
||||
LOG.warn("Launched process failed", e);
|
||||
builder
|
||||
.setExitCode(e.getExitCode())
|
||||
.setStdout(shell.getOutput())
|
||||
.setStderr(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test for the {@link ShellExecEndpointCoprocessor}.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestShellExecEndpointCoprocessor {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule testRule =
|
||||
HBaseClassTestRule.forClass(TestShellExecEndpointCoprocessor.class);
|
||||
|
||||
@ClassRule
|
||||
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
|
||||
.setConfiguration(createConfiguration())
|
||||
.build();
|
||||
|
||||
@Rule
|
||||
public final ConnectionRule connectionRule =
|
||||
new ConnectionRule(miniClusterRule::createConnection);
|
||||
|
||||
@Test
|
||||
public void testShellExecUnspecified() {
|
||||
testShellExecForeground(b -> {});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShellExecForeground() {
|
||||
testShellExecForeground(b -> b.setAwaitResponse(true));
|
||||
}
|
||||
|
||||
private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
|
||||
final AsyncConnection conn = connectionRule.getConnection();
|
||||
final AsyncAdmin admin = conn.getAdmin();
|
||||
|
||||
final String command = "echo -n \"hello world\"";
|
||||
final ShellExecRequest.Builder builder = ShellExecRequest.newBuilder()
|
||||
.setCommand(command);
|
||||
consumer.accept(builder);
|
||||
final ShellExecResponse resp = admin
|
||||
.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
|
||||
ShellExecService::newStub,
|
||||
(stub, controller, callback) -> stub.shellExec(controller, builder.build(), callback))
|
||||
.join();
|
||||
assertEquals(0, resp.getExitCode());
|
||||
assertEquals("hello world", resp.getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShellExecBackground() throws IOException {
|
||||
final AsyncConnection conn = connectionRule.getConnection();
|
||||
final AsyncAdmin admin = conn.getAdmin();
|
||||
|
||||
final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
|
||||
final File testFile = new File(testDataDir, "shell_exec_background.txt");
|
||||
assertTrue(testFile.createNewFile());
|
||||
assertEquals(0, testFile.length());
|
||||
|
||||
final String command = "echo \"hello world\" >> " + testFile.getAbsolutePath();
|
||||
final ShellExecRequest req = ShellExecRequest.newBuilder()
|
||||
.setCommand(command)
|
||||
.setAwaitResponse(false)
|
||||
.build();
|
||||
final ShellExecResponse resp = admin
|
||||
.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
|
||||
ShellExecService::newStub,
|
||||
(stub, controller, callback) -> stub.shellExec(controller, req, callback))
|
||||
.join();
|
||||
|
||||
assertFalse("the response from a background task should have no exit code", resp.hasExitCode());
|
||||
assertFalse("the response from a background task should have no stdout", resp.hasStdout());
|
||||
assertFalse("the response from a background task should have no stderr", resp.hasStderr());
|
||||
|
||||
Waiter.waitFor(conn.getConfiguration(), 5_000, () -> testFile.length() > 0);
|
||||
final String content = new String(Files.readAllBytes(testFile.toPath())).trim();
|
||||
assertEquals("hello world", content);
|
||||
}
|
||||
|
||||
private static File ensureTestDataDirExists(
|
||||
final HBaseTestingUtility testingUtility
|
||||
) throws IOException {
|
||||
final Path testDataDir = Optional.of(testingUtility)
|
||||
.map(HBaseTestingUtility::getDataTestDir)
|
||||
.map(Object::toString)
|
||||
.map(val -> Paths.get(val))
|
||||
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
|
||||
final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
|
||||
assertTrue(testDataDirFile.exists());
|
||||
return testDataDirFile;
|
||||
}
|
||||
|
||||
private static Configuration createConfiguration() {
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
conf.set("hbase.coprocessor.master.classes", ShellExecEndpointCoprocessor.class.getName());
|
||||
return conf;
|
||||
}
|
||||
}
|
|
@ -102,6 +102,7 @@ public abstract class MonkeyFactory {
|
|||
try {
|
||||
klass = Class.forName(factoryName);
|
||||
if (klass != null) {
|
||||
LOG.info("Instantiating {}", klass.getName());
|
||||
fact = (MonkeyFactory) ReflectionUtils.newInstance(klass);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -120,7 +120,7 @@ public class ChaosMonkeyRunner extends AbstractHBaseTool {
|
|||
util.createDistributedHBaseCluster();
|
||||
util.checkNodeCount(1);// make sure there's at least 1 alive rs
|
||||
} else {
|
||||
throw new RuntimeException("ChaosMonkeyRunner must run againt a distributed cluster,"
|
||||
throw new RuntimeException("ChaosMonkeyRunner must run against a distributed cluster,"
|
||||
+ " please check and point to the right configuration dir");
|
||||
}
|
||||
this.setConf(util.getConfiguration());
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Opens a tunnel for remote shell execution on the target server. Used by `CoprocClusterManager`.
|
||||
*/
|
||||
|
||||
syntax = "proto2";
|
||||
option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
|
||||
option java_outer_classname = "ShellExecEndpoint";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
|
||||
message ShellExecRequest {
|
||||
required string command = 1;
|
||||
optional bool await_response = 2;
|
||||
}
|
||||
|
||||
message ShellExecResponse {
|
||||
optional int32 exit_code = 1;
|
||||
optional string stdout = 2;
|
||||
optional string stderr = 3;
|
||||
}
|
||||
|
||||
service ShellExecService {
|
||||
rpc shell_exec(ShellExecRequest) returns(ShellExecResponse);
|
||||
}
|
|
@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
|
|||
* <pre>{@code
|
||||
* public class TestMyClass {
|
||||
* @ClassRule
|
||||
* public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
|
||||
* public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
*
|
||||
* private final ConnectionRule connectionRule =
|
||||
* new ConnectionRule(miniClusterRule::createConnection);
|
||||
|
|
|
@ -33,13 +33,13 @@ import org.junit.rules.ExternalResource;
|
|||
*
|
||||
* <pre>{@code
|
||||
* public class TestMyClass {
|
||||
* private static final MiniClusterRule miniClusterRule = new MiniClusterRule();
|
||||
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
* private static final ConnectionRule connectionRule =
|
||||
* new ConnectionRule(miniClusterRule::createConnection);
|
||||
*
|
||||
* @ClassRule
|
||||
* public static final TestRule rule = RuleChain
|
||||
* .outerRule(connectionRule)
|
||||
* .outerRule(miniClusterRule)
|
||||
* .around(connectionRule);
|
||||
* }
|
||||
* }</pre>
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -36,7 +37,7 @@ import org.junit.rules.TestRule;
|
|||
* <pre>{@code
|
||||
* public class TestMyClass {
|
||||
* @ClassRule
|
||||
* public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
|
||||
* public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
*
|
||||
* @Rule
|
||||
* public final ConnectionRule connectionRule =
|
||||
|
@ -44,25 +45,54 @@ import org.junit.rules.TestRule;
|
|||
* }
|
||||
* }</pre>
|
||||
*/
|
||||
public class MiniClusterRule extends ExternalResource {
|
||||
public final class MiniClusterRule extends ExternalResource {
|
||||
|
||||
/**
|
||||
* A builder for fluent composition of a new {@link MiniClusterRule}.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private StartMiniClusterOption miniClusterOption;
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Use the provided {@link StartMiniClusterOption} to construct the {@link MiniHBaseCluster}.
|
||||
*/
|
||||
public Builder setMiniClusterOption(final StartMiniClusterOption miniClusterOption) {
|
||||
this.miniClusterOption = miniClusterOption;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Seed the underlying {@link HBaseTestingUtility} with the provided {@link Configuration}.
|
||||
*/
|
||||
public Builder setConfiguration(final Configuration conf) {
|
||||
this.conf = conf;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MiniClusterRule build() {
|
||||
return new MiniClusterRule(
|
||||
conf,
|
||||
miniClusterOption != null
|
||||
? miniClusterOption
|
||||
: StartMiniClusterOption.builder().build());
|
||||
}
|
||||
}
|
||||
|
||||
private final HBaseTestingUtility testingUtility;
|
||||
private final StartMiniClusterOption miniClusterOptions;
|
||||
|
||||
private MiniHBaseCluster miniCluster;
|
||||
|
||||
/**
|
||||
* Create an instance over the default options provided by {@link StartMiniClusterOption}.
|
||||
*/
|
||||
public MiniClusterRule() {
|
||||
this(StartMiniClusterOption.builder().build());
|
||||
private MiniClusterRule(final Configuration conf,
|
||||
final StartMiniClusterOption miniClusterOptions) {
|
||||
this.testingUtility = new HBaseTestingUtility(conf);
|
||||
this.miniClusterOptions = miniClusterOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance using the provided {@link StartMiniClusterOption}.
|
||||
*/
|
||||
public MiniClusterRule(final StartMiniClusterOption miniClusterOptions) {
|
||||
this.testingUtility = new HBaseTestingUtility();
|
||||
this.miniClusterOptions = miniClusterOptions;
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,13 +36,18 @@ public class TestAlwaysStandByHMaster {
|
|||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
|
||||
HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
|
||||
|
||||
private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder().
|
||||
numAlwaysStandByMasters(1).numMasters(1).numRegionServers(1).build();
|
||||
private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder()
|
||||
.numAlwaysStandByMasters(1)
|
||||
.numMasters(1)
|
||||
.numRegionServers(1)
|
||||
.build();
|
||||
|
||||
@ClassRule
|
||||
public static final MiniClusterRule miniClusterRule = new MiniClusterRule(OPTION);
|
||||
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
|
||||
.setMiniClusterOption(OPTION)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Tests that the AlwaysStandByHMaster does not transition to active state even if no active
|
||||
|
|
|
@ -65,7 +65,7 @@ public class TestMetaBrowser {
|
|||
public static final HBaseClassTestRule testRule =
|
||||
HBaseClassTestRule.forClass(TestMetaBrowser.class);
|
||||
@ClassRule
|
||||
public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
|
||||
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
|
||||
private final ConnectionRule connectionRule =
|
||||
new ConnectionRule(miniClusterRule::createConnection);
|
||||
|
|
Loading…
Reference in New Issue