HBASE-21322 Add a scheduleServerCrashProcedure() API to HbckService
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
644ac13850
commit
422e98957b
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService.BlockingInterface;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService.BlockingInterface;
|
||||||
|
@ -154,4 +155,21 @@ public class HBaseHbck implements Hbck {
|
||||||
});
|
});
|
||||||
return response.getBypassedList();
|
return response.getBypassedList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Long> scheduleServerCrashProcedure(List<HBaseProtos.ServerName> serverNames)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
MasterProtos.ScheduleServerCrashProcedureResponse response =
|
||||||
|
this.hbck.scheduleServerCrashProcedure(rpcControllerFactory.newController(),
|
||||||
|
RequestConverter.toScheduleServerCrashProcedureRequest(serverNames));
|
||||||
|
return response.getPidList();
|
||||||
|
} catch (ServiceException se) {
|
||||||
|
LOG.debug(toCommaDelimitedString(
|
||||||
|
serverNames.stream().map(serverName -> ProtobufUtil.toServerName(serverName).toString())
|
||||||
|
.collect(Collectors.toList())),
|
||||||
|
se);
|
||||||
|
throw new IOException(se);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
||||||
* {@link #close()} when done.
|
* {@link #close()} when done.
|
||||||
|
@ -101,4 +103,7 @@ public interface Hbck extends Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean override, boolean recursive)
|
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean override, boolean recursive)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
List<Long> scheduleServerCrashProcedure(List<HBaseProtos.ServerName> serverNames)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1897,6 +1897,13 @@ public final class RequestConverter {
|
||||||
setOverride(override).build();
|
setOverride(override).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MasterProtos.ScheduleServerCrashProcedureRequest
|
||||||
|
toScheduleServerCrashProcedureRequest(List<HBaseProtos.ServerName> serverNames) {
|
||||||
|
MasterProtos.ScheduleServerCrashProcedureRequest.Builder b =
|
||||||
|
MasterProtos.ScheduleServerCrashProcedureRequest.newBuilder();
|
||||||
|
return b.addAllServerName(serverNames).build();
|
||||||
|
}
|
||||||
|
|
||||||
private static List<RegionSpecifier> toEncodedRegionNameRegionSpecifiers(
|
private static List<RegionSpecifier> toEncodedRegionNameRegionSpecifiers(
|
||||||
List<String> encodedRegionNames) {
|
List<String> encodedRegionNames) {
|
||||||
return encodedRegionNames.stream().
|
return encodedRegionNames.stream().
|
||||||
|
|
|
@ -1042,6 +1042,14 @@ message BypassProcedureResponse {
|
||||||
repeated bool bypassed = 1;
|
repeated bool bypassed = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ScheduleServerCrashProcedureRequest {
|
||||||
|
repeated ServerName serverName = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ScheduleServerCrashProcedureResponse {
|
||||||
|
repeated uint64 pid = 1;
|
||||||
|
}
|
||||||
|
|
||||||
service HbckService {
|
service HbckService {
|
||||||
/** Update state of the table in meta only*/
|
/** Update state of the table in meta only*/
|
||||||
rpc SetTableStateInMeta(SetTableStateInMetaRequest)
|
rpc SetTableStateInMeta(SetTableStateInMetaRequest)
|
||||||
|
@ -1068,4 +1076,8 @@ service HbckService {
|
||||||
/** Bypass a procedure to completion, procedure is completed but no actual work is done*/
|
/** Bypass a procedure to completion, procedure is completed but no actual work is done*/
|
||||||
rpc BypassProcedure(BypassProcedureRequest)
|
rpc BypassProcedure(BypassProcedureRequest)
|
||||||
returns(BypassProcedureResponse);
|
returns(BypassProcedureResponse);
|
||||||
|
|
||||||
|
/** Schedule a ServerCrashProcedure to help recover a crash server */
|
||||||
|
rpc ScheduleServerCrashProcedure(ScheduleServerCrashProcedureRequest)
|
||||||
|
returns(ScheduleServerCrashProcedureResponse);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -32,6 +34,7 @@ import java.util.Set;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -65,8 +68,10 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
|
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockType;
|
import org.apache.hadoop.hbase.procedure2.LockType;
|
||||||
|
@ -97,6 +102,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -2424,4 +2430,52 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure(
|
||||||
|
RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
List<HBaseProtos.ServerName> serverNames = request.getServerNameList();
|
||||||
|
List<Long> pids = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
for (HBaseProtos.ServerName serverName : serverNames) {
|
||||||
|
ServerName server = ProtobufUtil.toServerName(serverName);
|
||||||
|
if (shouldSubmitSCP(server)) {
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
|
pids.add(procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
|
||||||
|
server, true, containMetaWals(server))));
|
||||||
|
} else {
|
||||||
|
pids.add(-1L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean containMetaWals(ServerName serverName) throws IOException {
|
||||||
|
Path logDir = new Path(master.getWALRootDir(),
|
||||||
|
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||||
|
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
|
||||||
|
Path checkDir = master.getFileSystem().exists(splitDir) ? splitDir : logDir;
|
||||||
|
return master.getFileSystem().listStatus(checkDir, META_FILTER).length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldSubmitSCP(ServerName serverName) {
|
||||||
|
// check if there is already a SCP of this server running
|
||||||
|
List<Procedure<MasterProcedureEnv>> procedures =
|
||||||
|
master.getMasterProcedureExecutor().getProcedures();
|
||||||
|
for (Procedure<MasterProcedureEnv> procedure : procedures) {
|
||||||
|
if (procedure instanceof ServerCrashProcedure) {
|
||||||
|
if (serverName.compareTo(((ServerCrashProcedure) procedure).getServerName()) == 0
|
||||||
|
&& !procedure.isFinished()) {
|
||||||
|
LOG.info("there is already a SCP of this server {} running, pid {}", serverName,
|
||||||
|
procedure.getProcId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1330,13 +1330,14 @@ public class AssignmentManager implements ServerListener {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
|
public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
|
||||||
boolean carryingMeta = isCarryingMeta(serverName);
|
boolean carryingMeta = isCarryingMeta(serverName);
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
|
long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
|
||||||
shouldSplitWal, carryingMeta));
|
serverName, shouldSplitWal, carryingMeta));
|
||||||
LOG.debug("Added=" + serverName +
|
LOG.debug("Added=" + serverName
|
||||||
" to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
|
+ " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
|
||||||
|
return pid;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void offlineRegion(final RegionInfo regionInfo) {
|
public void offlineRegion(final RegionInfo regionInfo) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -50,6 +52,8 @@ import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to test HBaseHbck.
|
* Class to test HBaseHbck.
|
||||||
* Spins up the minicluster once at test start and then takes it down afterward.
|
* Spins up the minicluster once at test start and then takes it down afterward.
|
||||||
|
@ -173,6 +177,23 @@ public class TestHbck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleSCP() throws Exception {
|
||||||
|
HRegionServer testRs = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
||||||
|
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
|
||||||
|
true);
|
||||||
|
ServerName serverName = testRs.getServerName();
|
||||||
|
Hbck hbck = TEST_UTIL.getHbck();
|
||||||
|
List<Long> pids =
|
||||||
|
hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
|
||||||
|
assertTrue(pids.get(0) > 0);
|
||||||
|
LOG.info("pid is {}", pids.get(0));
|
||||||
|
|
||||||
|
pids = hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
|
||||||
|
assertTrue(pids.get(0) == -1);
|
||||||
|
LOG.info("pid is {}", pids.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
private void waitOnPids(List<Long> pids) {
|
private void waitOnPids(List<Long> pids) {
|
||||||
for (Long pid: pids) {
|
for (Long pid: pids) {
|
||||||
while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().
|
while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().
|
||||||
|
|
Loading…
Reference in New Issue