HBASE-21023 Added bypassProcedure() API to HbckService
This commit is contained in:
parent
3a0fcd56cf
commit
dc767c06d2
|
@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
|
||||||
|
@ -128,4 +130,25 @@ public class HBaseHbck implements Hbck {
|
||||||
private static String toCommaDelimitedString(List<String> list) {
|
private static String toCommaDelimitedString(List<String> list) {
|
||||||
return list.stream().collect(Collectors.joining(", "));
|
return list.stream().collect(Collectors.joining(", "));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force)
|
||||||
|
throws IOException {
|
||||||
|
MasterProtos.BypassProcedureResponse response = ProtobufUtil.call(
|
||||||
|
new Callable<MasterProtos.BypassProcedureResponse>() {
|
||||||
|
@Override
|
||||||
|
public MasterProtos.BypassProcedureResponse call() throws Exception {
|
||||||
|
try {
|
||||||
|
return hbck.bypassProcedure(rpcControllerFactory.newController(),
|
||||||
|
MasterProtos.BypassProcedureRequest.newBuilder().addAllProcId(pids).
|
||||||
|
setWaitTime(waitTime).setForce(force).build());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error(pids.stream().map(i -> i.toString()).
|
||||||
|
collect(Collectors.joining(", ")), t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return response.getBypassedList();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,15 +26,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hbck APIs for HBase. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
||||||
* {@link #close()} when done.
|
* {@link #close()} when done.
|
||||||
* <p>Hbck client APIs will be mostly used by hbck tool which in turn can be used by operators to
|
* <p>WARNING: the below methods can damage the cluster. For experienced users only.
|
||||||
* fix HBase and bringing it to consistent state.</p>
|
|
||||||
*
|
|
||||||
* <p>NOTE: The methods in here can do damage to a cluster if applied in the wrong sequence or at
|
|
||||||
* the wrong time. Use with caution. For experts only. These methods are only for the
|
|
||||||
* extreme case where the cluster has been damaged or has achieved an inconsistent state because
|
|
||||||
* of some unforeseen circumstance or bug and requires manual intervention.
|
|
||||||
*
|
*
|
||||||
* @see ConnectionFactory
|
* @see ConnectionFactory
|
||||||
* @see ClusterConnection
|
* @see ClusterConnection
|
||||||
|
@ -45,10 +39,6 @@ public interface Hbck extends Abortable, Closeable {
|
||||||
/**
|
/**
|
||||||
* Update table state in Meta only. No procedures are submitted to open/assign or
|
* Update table state in Meta only. No procedures are submitted to open/assign or
|
||||||
* close/unassign regions of the table.
|
* close/unassign regions of the table.
|
||||||
*
|
|
||||||
* <p>>NOTE: This is a dangerous action, as existing running procedures for the table or regions
|
|
||||||
* which belong to the table may get confused.
|
|
||||||
*
|
|
||||||
* @param state table state
|
* @param state table state
|
||||||
* @return previous state of the table in Meta
|
* @return previous state of the table in Meta
|
||||||
*/
|
*/
|
||||||
|
@ -75,4 +65,17 @@ public interface Hbck extends Abortable, Closeable {
|
||||||
* example of what a random user-space encoded Region name looks like.
|
* example of what a random user-space encoded Region name looks like.
|
||||||
*/
|
*/
|
||||||
List<Long> unassigns(List<String> encodedRegionNames) throws IOException;
|
List<Long> unassigns(List<String> encodedRegionNames) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bypass specified procedure and move it to completion. Procedure is marked completed but
|
||||||
|
* no actual work is done from the current state/step onwards. Parents of the procedure are
|
||||||
|
* also marked for bypass.
|
||||||
|
*
|
||||||
|
* @param pids of procedures to complete.
|
||||||
|
* @param waitTime wait time in ms for acquiring lock for a procedure
|
||||||
|
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||||
|
* This is for procedures which can't break out during execution (bugs?).
|
||||||
|
* @return true if procedure is marked for bypass successfully, false otherwise
|
||||||
|
*/
|
||||||
|
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||||
|
@ -996,7 +997,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
* <p>
|
* <p>
|
||||||
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
|
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
|
||||||
* TODO: What about WAITING_TIMEOUT?
|
* TODO: What about WAITING_TIMEOUT?
|
||||||
* @param id the procedure id
|
* @param pids the procedure id
|
||||||
* @param lockWait time to wait lock
|
* @param lockWait time to wait lock
|
||||||
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||||
* This is for procedures which can't break out during executing(due to bug, mostly)
|
* This is for procedures which can't break out during executing(due to bug, mostly)
|
||||||
|
@ -1007,15 +1008,23 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
* @return true if bypass success
|
* @return true if bypass success
|
||||||
* @throws IOException IOException
|
* @throws IOException IOException
|
||||||
*/
|
*/
|
||||||
public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException {
|
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force)
|
||||||
Procedure<TEnvironment> procedure = getProcedure(id);
|
throws IOException {
|
||||||
|
List<Boolean> result = new ArrayList<Boolean>(pids.size());
|
||||||
|
for(long pid: pids) {
|
||||||
|
result.add(bypassProcedure(pid, lockWait, force));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException {
|
||||||
|
Procedure<TEnvironment> procedure = getProcedure(pid);
|
||||||
if (procedure == null) {
|
if (procedure == null) {
|
||||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", id);
|
LOG.debug("Procedure with id={} does not exist, skipping bypass", pid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
|
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
|
||||||
|
|
||||||
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
|
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
|
||||||
if (lockEntry == null && !force) {
|
if (lockEntry == null && !force) {
|
||||||
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
|
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
|
||||||
|
|
|
@ -1032,6 +1032,16 @@ message UnassignsResponse {
|
||||||
repeated uint64 pid = 1;
|
repeated uint64 pid = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message BypassProcedureRequest {
|
||||||
|
repeated uint64 proc_id = 1;
|
||||||
|
optional uint64 waitTime = 2; // wait time in ms to acquire lock on a procedure
|
||||||
|
optional bool force = 3; // if true, procedure is marked for bypass even if its executing
|
||||||
|
}
|
||||||
|
|
||||||
|
message BypassProcedureResponse {
|
||||||
|
repeated bool bypassed = 1;
|
||||||
|
}
|
||||||
|
|
||||||
service HbckService {
|
service HbckService {
|
||||||
/** Update state of the table in meta only*/
|
/** Update state of the table in meta only*/
|
||||||
rpc SetTableStateInMeta(SetTableStateInMetaRequest)
|
rpc SetTableStateInMeta(SetTableStateInMetaRequest)
|
||||||
|
@ -1054,4 +1064,8 @@ service HbckService {
|
||||||
*/
|
*/
|
||||||
rpc Unassigns(UnassignsRequest)
|
rpc Unassigns(UnassignsRequest)
|
||||||
returns(UnassignsResponse);
|
returns(UnassignsResponse);
|
||||||
|
|
||||||
|
/** Bypass a procedure to completion, procedure is completed but no actual work is done*/
|
||||||
|
rpc BypassProcedure(BypassProcedureRequest)
|
||||||
|
returns(BypassProcedureResponse);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2426,4 +2426,28 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
throw new ServiceException(ioe);
|
throw new ServiceException(ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bypass specified procedure to completion. Procedure is marked completed but no actual work
|
||||||
|
* is done from the current state/ step onwards. Parents of the procedure are also marked for
|
||||||
|
* bypass.
|
||||||
|
*
|
||||||
|
* NOTE: this is a dangerous operation and may be used to unstuck buggy procedures. This may
|
||||||
|
* leave system in inconherent state. This may need to be followed by some cleanup steps/
|
||||||
|
* actions by operator.
|
||||||
|
*
|
||||||
|
* @return BypassProcedureToCompletionResponse indicating success or failure
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public MasterProtos.BypassProcedureResponse bypassProcedure(RpcController controller,
|
||||||
|
MasterProtos.BypassProcedureRequest request) throws ServiceException {
|
||||||
|
try {
|
||||||
|
List<Boolean> ret =
|
||||||
|
master.getMasterProcedureExecutor().bypassProcedure(request.getProcIdList(),
|
||||||
|
request.getWaitTime(), request.getForce());
|
||||||
|
return MasterProtos.BypassProcedureResponse.newBuilder().addAllBypassed(ret).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -63,10 +69,13 @@ public class TestHbck {
|
||||||
|
|
||||||
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
|
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
|
||||||
|
|
||||||
|
private static ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
|
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
|
||||||
|
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -74,6 +83,45 @@ public class TestHbck {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class SuspendProcedure extends
|
||||||
|
ProcedureTestingUtility.NoopProcedure<MasterProcedureEnv> implements TableProcedureInterface {
|
||||||
|
public SuspendProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final MasterProcedureEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
// Always suspend the procedure
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableName getTableName() {
|
||||||
|
return TABLE_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableOperationType getTableOperationType() {
|
||||||
|
return TableOperationType.READ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBypassProcedure() throws Exception {
|
||||||
|
// SuspendProcedure
|
||||||
|
final SuspendProcedure proc = new SuspendProcedure();
|
||||||
|
long procId = procExec.submitProcedure(proc);
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
//bypass the procedure
|
||||||
|
List<Long> pids = Arrays.<Long>asList(procId);
|
||||||
|
List<Boolean> results = TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false);
|
||||||
|
assertTrue("Failed to by pass procedure!", results.get(0));
|
||||||
|
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||||
|
LOG.info("{} finished", proc);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetTableStateInMeta() throws IOException {
|
public void testSetTableStateInMeta() throws IOException {
|
||||||
Hbck hbck = TEST_UTIL.getHbck();
|
Hbck hbck = TEST_UTIL.getHbck();
|
||||||
|
|
Loading…
Reference in New Issue