HBASE-21023 Added bypassProcedure() API to HbckService
This commit is contained in:
parent
8876f12c0c
commit
899fddb4e7
|
@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
|
||||
|
@ -128,4 +130,25 @@ public class HBaseHbck implements Hbck {
|
|||
private static String toCommaDelimitedString(List<String> list) {
|
||||
return list.stream().collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force)
|
||||
throws IOException {
|
||||
MasterProtos.BypassProcedureResponse response = ProtobufUtil.call(
|
||||
new Callable<MasterProtos.BypassProcedureResponse>() {
|
||||
@Override
|
||||
public MasterProtos.BypassProcedureResponse call() throws Exception {
|
||||
try {
|
||||
return hbck.bypassProcedure(rpcControllerFactory.newController(),
|
||||
MasterProtos.BypassProcedureRequest.newBuilder().addAllProcId(pids).
|
||||
setWaitTime(waitTime).setForce(force).build());
|
||||
} catch (Throwable t) {
|
||||
LOG.error(pids.stream().map(i -> i.toString()).
|
||||
collect(Collectors.joining(", ")), t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
});
|
||||
return response.getBypassedList();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,15 +26,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Hbck APIs for HBase. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
||||
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
||||
* {@link #close()} when done.
|
||||
* <p>Hbck client APIs will be mostly used by hbck tool which in turn can be used by operators to
|
||||
* fix HBase and bringing it to consistent state.</p>
|
||||
*
|
||||
* <p>NOTE: The methods in here can do damage to a cluster if applied in the wrong sequence or at
|
||||
* the wrong time. Use with caution. For experts only. These methods are only for the
|
||||
* extreme case where the cluster has been damaged or has achieved an inconsistent state because
|
||||
* of some unforeseen circumstance or bug and requires manual intervention.
|
||||
* <p>WARNING: the below methods can damage the cluster. For experienced users only.
|
||||
*
|
||||
* @see ConnectionFactory
|
||||
* @see ClusterConnection
|
||||
|
@ -45,10 +39,6 @@ public interface Hbck extends Abortable, Closeable {
|
|||
/**
|
||||
* Update table state in Meta only. No procedures are submitted to open/assign or
|
||||
* close/unassign regions of the table.
|
||||
*
|
||||
* <p>>NOTE: This is a dangerous action, as existing running procedures for the table or regions
|
||||
* which belong to the table may get confused.
|
||||
*
|
||||
* @param state table state
|
||||
* @return previous state of the table in Meta
|
||||
*/
|
||||
|
@ -75,4 +65,17 @@ public interface Hbck extends Abortable, Closeable {
|
|||
* example of what a random user-space encoded Region name looks like.
|
||||
*/
|
||||
List<Long> unassigns(List<String> encodedRegionNames) throws IOException;
|
||||
|
||||
/**
|
||||
* Bypass specified procedure and move it to completion. Procedure is marked completed but
|
||||
* no actual work is done from the current state/step onwards. Parents of the procedure are
|
||||
* also marked for bypass.
|
||||
*
|
||||
* @param pids of procedures to complete.
|
||||
* @param waitTime wait time in ms for acquiring lock for a procedure
|
||||
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||
* This is for procedures which can't break out during execution (bugs?).
|
||||
* @return true if procedure is marked for bypass successfully, false otherwise
|
||||
*/
|
||||
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force) throws IOException;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||
|
@ -996,7 +997,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* <p>
|
||||
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
|
||||
* TODO: What about WAITING_TIMEOUT?
|
||||
* @param id the procedure id
|
||||
* @param pids the procedure id
|
||||
* @param lockWait time to wait lock
|
||||
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||
* This is for procedures which can't break out during executing(due to bug, mostly)
|
||||
|
@ -1007,15 +1008,23 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @return true if bypass success
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException {
|
||||
Procedure<TEnvironment> procedure = getProcedure(id);
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force)
|
||||
throws IOException {
|
||||
List<Boolean> result = new ArrayList<Boolean>(pids.size());
|
||||
for(long pid: pids) {
|
||||
result.add(bypassProcedure(pid, lockWait, force));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException {
|
||||
Procedure<TEnvironment> procedure = getProcedure(pid);
|
||||
if (procedure == null) {
|
||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", id);
|
||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", pid);
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
|
||||
|
||||
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
|
||||
if (lockEntry == null && !force) {
|
||||
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
|
||||
|
|
|
@ -1028,6 +1028,16 @@ message UnassignsResponse {
|
|||
repeated uint64 pid = 1;
|
||||
}
|
||||
|
||||
message BypassProcedureRequest {
|
||||
repeated uint64 proc_id = 1;
|
||||
optional uint64 waitTime = 2; // wait time in ms to acquire lock on a procedure
|
||||
optional bool force = 3; // if true, procedure is marked for bypass even if its executing
|
||||
}
|
||||
|
||||
message BypassProcedureResponse {
|
||||
repeated bool bypassed = 1;
|
||||
}
|
||||
|
||||
service HbckService {
|
||||
/** Update state of the table in meta only*/
|
||||
rpc SetTableStateInMeta(SetTableStateInMetaRequest)
|
||||
|
@ -1050,4 +1060,8 @@ service HbckService {
|
|||
*/
|
||||
rpc Unassigns(UnassignsRequest)
|
||||
returns(UnassignsResponse);
|
||||
|
||||
/** Bypass a procedure to completion, procedure is completed but no actual work is done*/
|
||||
rpc BypassProcedure(BypassProcedureRequest)
|
||||
returns(BypassProcedureResponse);
|
||||
}
|
||||
|
|
|
@ -2392,4 +2392,28 @@ public class MasterRpcServices extends RSRpcServices
|
|||
throw new ServiceException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bypass specified procedure to completion. Procedure is marked completed but no actual work
|
||||
* is done from the current state/ step onwards. Parents of the procedure are also marked for
|
||||
* bypass.
|
||||
*
|
||||
* NOTE: this is a dangerous operation and may be used to unstuck buggy procedures. This may
|
||||
* leave system in inconherent state. This may need to be followed by some cleanup steps/
|
||||
* actions by operator.
|
||||
*
|
||||
* @return BypassProcedureToCompletionResponse indicating success or failure
|
||||
*/
|
||||
@Override
|
||||
public MasterProtos.BypassProcedureResponse bypassProcedure(RpcController controller,
|
||||
MasterProtos.BypassProcedureRequest request) throws ServiceException {
|
||||
try {
|
||||
List<Boolean> ret =
|
||||
master.getMasterProcedureExecutor().bypassProcedure(request.getProcIdList(),
|
||||
request.getWaitTime(), request.getForce());
|
||||
return MasterProtos.BypassProcedureResponse.newBuilder().addAllBypassed(ret).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -63,10 +69,13 @@ public class TestHbck {
|
|||
|
||||
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
|
||||
|
||||
private static ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
|
||||
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -74,6 +83,45 @@ public class TestHbck {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
public static class SuspendProcedure extends
|
||||
ProcedureTestingUtility.NoopProcedure<MasterProcedureEnv> implements TableProcedureInterface {
|
||||
public SuspendProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final MasterProcedureEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
// Always suspend the procedure
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.READ;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBypassProcedure() throws Exception {
|
||||
// SuspendProcedure
|
||||
final SuspendProcedure proc = new SuspendProcedure();
|
||||
long procId = procExec.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
|
||||
//bypass the procedure
|
||||
List<Long> pids = Arrays.<Long>asList(procId);
|
||||
List<Boolean> results = TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false);
|
||||
assertTrue("Failed to by pass procedure!", results.get(0));
|
||||
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetTableStateInMeta() throws IOException {
|
||||
Hbck hbck = TEST_UTIL.getHbck();
|
||||
|
|
Loading…
Reference in New Issue