Revert "Revert "HBASE-21213 [hbck2] bypass leaves behind state in RegionStates when assign/unassign""
This reverts commit b96905d1df
.
i.e. a revert of a revert so a reapplication!
Revert so I can add signed-off-by....
Signed-off-by: Allan Yang <allan163@apache.org>
This commit is contained in:
parent
b96905d1df
commit
2174461cf7
|
@ -102,11 +102,12 @@ public class HBaseHbck implements Hbck {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Long> assigns(List<String> encodedRegionNames) throws IOException {
|
||||
public List<Long> assigns(List<String> encodedRegionNames, boolean override)
|
||||
throws IOException {
|
||||
try {
|
||||
MasterProtos.AssignsResponse response =
|
||||
this.hbck.assigns(rpcControllerFactory.newController(),
|
||||
RequestConverter.toAssignRegionsRequest(encodedRegionNames));
|
||||
RequestConverter.toAssignRegionsRequest(encodedRegionNames, override));
|
||||
return response.getPidList();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug(toCommaDelimitedString(encodedRegionNames), se);
|
||||
|
@ -115,11 +116,12 @@ public class HBaseHbck implements Hbck {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Long> unassigns(List<String> encodedRegionNames) throws IOException {
|
||||
public List<Long> unassigns(List<String> encodedRegionNames, boolean override)
|
||||
throws IOException {
|
||||
try {
|
||||
MasterProtos.UnassignsResponse response =
|
||||
this.hbck.unassigns(rpcControllerFactory.newController(),
|
||||
RequestConverter.toUnassignRegionsRequest(encodedRegionNames));
|
||||
RequestConverter.toUnassignRegionsRequest(encodedRegionNames, override));
|
||||
return response.getPidList();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug(toCommaDelimitedString(encodedRegionNames), se);
|
||||
|
@ -132,7 +134,8 @@ public class HBaseHbck implements Hbck {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force)
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean override,
|
||||
boolean recursive)
|
||||
throws IOException {
|
||||
MasterProtos.BypassProcedureResponse response = ProtobufUtil.call(
|
||||
new Callable<MasterProtos.BypassProcedureResponse>() {
|
||||
|
@ -141,7 +144,7 @@ public class HBaseHbck implements Hbck {
|
|||
try {
|
||||
return hbck.bypassProcedure(rpcControllerFactory.newController(),
|
||||
MasterProtos.BypassProcedureRequest.newBuilder().addAllProcId(pids).
|
||||
setWaitTime(waitTime).setForce(force).build());
|
||||
setWaitTime(waitTime).setOverride(override).setRecursive(recursive).build());
|
||||
} catch (Throwable t) {
|
||||
LOG.error(pids.stream().map(i -> i.toString()).
|
||||
collect(Collectors.joining(", ")), t);
|
||||
|
|
|
@ -28,11 +28,14 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
/**
|
||||
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
|
||||
* {@link #close()} when done.
|
||||
* <p>WARNING: the below methods can damage the cluster. For experienced users only.
|
||||
* <p>WARNING: the below methods can damage the cluster. It may leave the cluster in an
|
||||
* indeterminate state, e.g. region not assigned, or some hdfs files left behind. After running
|
||||
* any of the below, operators may have to do some clean up on hdfs or schedule some assign
|
||||
* procedures to get regions back online. DO AT YOUR OWN RISK. For experienced users only.
|
||||
*
|
||||
* @see ConnectionFactory
|
||||
* @see ClusterConnection
|
||||
* @since 2.2.0
|
||||
* @since 2.0.2, 2.1.1
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
|
||||
public interface Hbck extends Abortable, Closeable {
|
||||
|
@ -49,22 +52,38 @@ public interface Hbck extends Abortable, Closeable {
|
|||
* -- good if many Regions to online -- and it will schedule the assigns even in the case where
|
||||
* Master is initializing (as long as the ProcedureExecutor is up). Does NOT call Coprocessor
|
||||
* hooks.
|
||||
* @param override You need to add the override for case where a region has previously been
|
||||
* bypassed. When a Procedure has been bypassed, a Procedure will have completed
|
||||
* but no other Procedure will be able to make progress on the target entity
|
||||
* (intentionally). This override flag will override this fencing mechanism.
|
||||
* @param encodedRegionNames Region encoded names; e.g. 1588230740 is the hard-coded encoding
|
||||
* for hbase:meta region and de00010733901a05f5a2a3a382e27dd4 is an
|
||||
* example of what a random user-space encoded Region name looks like.
|
||||
*/
|
||||
List<Long> assigns(List<String> encodedRegionNames) throws IOException;
|
||||
List<Long> assigns(List<String> encodedRegionNames, boolean override) throws IOException;
|
||||
|
||||
default List<Long> assigns(List<String> encodedRegionNames) throws IOException {
|
||||
return assigns(encodedRegionNames, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link Admin#unassign(byte[], boolean)} but 'raw' in that it can do more than one Region
|
||||
* at a time -- good if many Regions to offline -- and it will schedule the assigns even in the
|
||||
* case where Master is initializing (as long as the ProcedureExecutor is up). Does NOT call
|
||||
* Coprocessor hooks.
|
||||
* @param override You need to add the override for case where a region has previously been
|
||||
* bypassed. When a Procedure has been bypassed, a Procedure will have completed
|
||||
* but no other Procedure will be able to make progress on the target entity
|
||||
* (intentionally). This override flag will override this fencing mechanism.
|
||||
* @param encodedRegionNames Region encoded names; e.g. 1588230740 is the hard-coded encoding
|
||||
* for hbase:meta region and de00010733901a05f5a2a3a382e27dd4 is an
|
||||
* example of what a random user-space encoded Region name looks like.
|
||||
*/
|
||||
List<Long> unassigns(List<String> encodedRegionNames) throws IOException;
|
||||
List<Long> unassigns(List<String> encodedRegionNames, boolean override) throws IOException;
|
||||
|
||||
default List<Long> unassigns(List<String> encodedRegionNames) throws IOException {
|
||||
return unassigns(encodedRegionNames, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Bypass specified procedure and move it to completion. Procedure is marked completed but
|
||||
|
@ -73,9 +92,12 @@ public interface Hbck extends Abortable, Closeable {
|
|||
*
|
||||
* @param pids of procedures to complete.
|
||||
* @param waitTime wait time in ms for acquiring lock for a procedure
|
||||
* @param force if force set to true, we will bypass the procedure even if it is executing.
|
||||
* @param override if override set to true, we will bypass the procedure even if it is executing.
|
||||
* This is for procedures which can't break out during execution (bugs?).
|
||||
* @param recursive If set, if a parent procedure, we will find and bypass children and then
|
||||
* the parent procedure (Dangerous but useful in case where child procedure has been 'lost').
|
||||
* @return true if procedure is marked for bypass successfully, false otherwise
|
||||
*/
|
||||
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean force) throws IOException;
|
||||
List<Boolean> bypassProcedure(List<Long> pids, long waitTime, boolean override, boolean recursive)
|
||||
throws IOException;
|
||||
}
|
||||
|
|
|
@ -1883,16 +1883,18 @@ public final class RequestConverter {
|
|||
|
||||
// HBCK2
|
||||
public static MasterProtos.AssignsRequest toAssignRegionsRequest(
|
||||
List<String> encodedRegionNames) {
|
||||
List<String> encodedRegionNames, boolean override) {
|
||||
MasterProtos.AssignsRequest.Builder b = MasterProtos.AssignsRequest.newBuilder();
|
||||
return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).build();
|
||||
return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).
|
||||
setOverride(override).build();
|
||||
}
|
||||
|
||||
public static MasterProtos.UnassignsRequest toUnassignRegionsRequest(
|
||||
List<String> encodedRegionNames) {
|
||||
List<String> encodedRegionNames, boolean override) {
|
||||
MasterProtos.UnassignsRequest.Builder b =
|
||||
MasterProtos.UnassignsRequest.newBuilder();
|
||||
return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).build();
|
||||
return b.addAllRegion(toEncodedRegionNameRegionSpecifiers(encodedRegionNames)).
|
||||
setOverride(override).build();
|
||||
}
|
||||
|
||||
private static List<RegionSpecifier> toEncodedRegionNameRegionSpecifiers(
|
||||
|
|
|
@ -145,12 +145,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
private boolean lockedWhenLoading = false;
|
||||
|
||||
/**
|
||||
* Used for force complete of the procedure without
|
||||
* actually doing any logic in the procedure.
|
||||
* Used for override complete of the procedure without actually doing any logic in the procedure.
|
||||
* If bypass is set to true, when executing it will return null when
|
||||
* {@link #doExecute(Object)} to finish the procedure and releasing any locks
|
||||
* it may currently hold.
|
||||
* Bypassing a procedure is not like aborting. Aborting a procedure will trigger
|
||||
* {@link #doExecute(Object)} is called to finish the procedure and release any locks
|
||||
* it may currently hold. The bypass does cleanup around the Procedure as far as the
|
||||
* Procedure framework is concerned. It does not clean any internal state that the
|
||||
* Procedure's themselves may have set. That is for the Procedures to do themselves
|
||||
* when bypass is called. They should override bypass and do their cleanup in the
|
||||
* overridden bypass method (be sure to call the parent bypass to ensure proper
|
||||
* processing).
|
||||
* <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger
|
||||
* a rollback. And since the {@link #abort(Object)} method is overrideable
|
||||
* Some procedures may have chosen to ignore the aborting.
|
||||
*/
|
||||
|
@ -175,12 +179,15 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
}
|
||||
|
||||
/**
|
||||
* set the bypass to true
|
||||
* Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean)} for now,
|
||||
* DO NOT use this method alone, since we can't just bypass
|
||||
* one single procedure. We need to bypass its ancestor too. So making it package private
|
||||
* Set the bypass to true.
|
||||
* Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)}.
|
||||
* DO NOT use this method alone, since we can't just bypass one single procedure. We need to
|
||||
* bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.
|
||||
* @param env Current environment. May be null because of context; e.g. pretty-printing
|
||||
* procedure WALs where there is no 'environment' (and where Procedures that require
|
||||
* an 'environment' won't be run.
|
||||
*/
|
||||
void bypass() {
|
||||
protected void bypass(TEnvironment env) {
|
||||
this.bypass = true;
|
||||
}
|
||||
|
||||
|
@ -706,7 +713,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
/**
|
||||
* Will only be called when loading procedures from procedure store, where we need to record
|
||||
* whether the procedure has already held a lock. Later we will call
|
||||
* {@link #doAcquireLock(Object)} to actually acquire the lock.
|
||||
* {@link #doAcquireLock(Object, ProcedureStore)} to actually acquire the lock.
|
||||
*/
|
||||
final void lockedWhenLoading() {
|
||||
this.lockedWhenLoading = true;
|
||||
|
|
|
@ -306,7 +306,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Created in the {@link #start(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
|
||||
* Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
|
||||
* resource handling rather than observing in a #join is unexpected).
|
||||
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
|
||||
* (Should be ok).
|
||||
|
@ -314,7 +314,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
private ThreadGroup threadGroup;
|
||||
|
||||
/**
|
||||
* Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
|
||||
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
|
||||
* resource handling rather than observing in a #join is unexpected).
|
||||
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
|
||||
* (Should be ok).
|
||||
|
@ -322,7 +322,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
private CopyOnWriteArrayList<WorkerThread> workerThreads;
|
||||
|
||||
/**
|
||||
* Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
|
||||
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
|
||||
* resource handling rather than observing in a #join is unexpected).
|
||||
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
|
||||
* (Should be ok).
|
||||
|
@ -966,7 +966,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* Bypass a procedure. If the procedure is set to bypass, all the logic in
|
||||
* execute/rollback will be ignored and it will return success, whatever.
|
||||
* It is used to recover buggy stuck procedures, releasing the lock resources
|
||||
* and letting other procedures to run. Bypassing one procedure (and its ancestors will
|
||||
* and letting other procedures run. Bypassing one procedure (and its ancestors will
|
||||
* be bypassed automatically) may leave the cluster in a middle state, e.g. region
|
||||
* not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
|
||||
* the operators may have to do some clean up on hdfs or schedule some assign procedures
|
||||
|
@ -993,34 +993,38 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* there. We need to restart the master after bypassing, and letting the problematic
|
||||
* procedure to execute wth bypass=true, so in that condition, the procedure can be
|
||||
* successfully bypassed.
|
||||
* @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
|
||||
* @return true if bypass success
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force)
|
||||
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
|
||||
boolean recursive)
|
||||
throws IOException {
|
||||
List<Boolean> result = new ArrayList<Boolean>(pids.size());
|
||||
for(long pid: pids) {
|
||||
result.add(bypassProcedure(pid, lockWait, force));
|
||||
result.add(bypassProcedure(pid, lockWait, force, recursive));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException {
|
||||
Procedure<TEnvironment> procedure = getProcedure(pid);
|
||||
boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
|
||||
throws IOException {
|
||||
final Procedure<TEnvironment> procedure = getProcedure(pid);
|
||||
if (procedure == null) {
|
||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", pid);
|
||||
LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force);
|
||||
LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}",
|
||||
procedure, lockWait, override, recursive);
|
||||
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
|
||||
if (lockEntry == null && !force) {
|
||||
if (lockEntry == null && !override) {
|
||||
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
|
||||
lockWait, procedure, force);
|
||||
lockWait, procedure, override);
|
||||
return false;
|
||||
} else if (lockEntry == null) {
|
||||
LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
|
||||
lockWait, procedure, force);
|
||||
lockWait, procedure, override);
|
||||
}
|
||||
try {
|
||||
// check whether the procedure is already finished
|
||||
|
@ -1030,9 +1034,30 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
|
||||
if (procedure.hasChildren()) {
|
||||
if (recursive) {
|
||||
// EXPENSIVE. Checks each live procedure of which there could be many!!!
|
||||
// Is there another way to get children of a procedure?
|
||||
LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
|
||||
this.procedures.forEachValue(1 /*Single-threaded*/,
|
||||
// Transformer
|
||||
v -> {
|
||||
return v.getParentProcId() == procedure.getProcId()? v: null;
|
||||
},
|
||||
// Consumer
|
||||
v -> {
|
||||
boolean result = false;
|
||||
IOException ioe = null;
|
||||
try {
|
||||
result = bypassProcedure(v.getProcId(), lockWait, override, recursive);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
LOG.debug("{} has children, skipping bypass", procedure);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// If the procedure has no parent or no child, we are safe to bypass it in whatever state
|
||||
if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
|
||||
|
@ -1041,6 +1066,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
|
||||
+ "(with no parent), {}",
|
||||
procedure);
|
||||
// Question: how is the bypass done here?
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1050,7 +1076,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
Procedure current = procedure;
|
||||
while (current != null) {
|
||||
LOG.debug("Bypassing {}", current);
|
||||
current.bypass();
|
||||
current.bypass(getEnvironment());
|
||||
store.update(procedure);
|
||||
long parentID = current.getParentProcId();
|
||||
current = getProcedure(parentID);
|
||||
|
|
|
@ -272,7 +272,7 @@ public final class ProcedureUtil {
|
|||
}
|
||||
|
||||
if (proto.getBypass()) {
|
||||
proc.bypass();
|
||||
proc.bypass(null);
|
||||
}
|
||||
|
||||
ProcedureStateSerializer serializer = null;
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
|
||||
|
@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
|||
* their stacks traces and messages overridden to reflect the original 'remote' exception.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@SuppressWarnings("serial")
|
||||
public class RemoteProcedureException extends ProcedureException {
|
||||
|
||||
|
@ -74,6 +72,10 @@ public class RemoteProcedureException extends ProcedureException {
|
|||
return new Exception(cause);
|
||||
}
|
||||
|
||||
// NOTE: Does not throw DoNotRetryIOE because it does not
|
||||
// have access (DNRIOE is in the client module). Use
|
||||
// MasterProcedureUtil.unwrapRemoteIOException if need to
|
||||
// throw DNRIOE.
|
||||
public IOException unwrapRemoteIOException() {
|
||||
final Exception cause = unwrapRemoteException();
|
||||
if (cause instanceof IOException) {
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TestProcedureBypass {
|
|||
long id = procExecutor.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
//bypass the procedure
|
||||
assertTrue(procExecutor.bypassProcedure(id, 30000, false));
|
||||
assertTrue(procExecutor.bypassProcedure(id, 30000, false, false));
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ public class TestProcedureBypass {
|
|||
long id = procExecutor.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
//bypass the procedure
|
||||
assertTrue(procExecutor.bypassProcedure(id, 1000, true));
|
||||
assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
|
||||
//Since the procedure is stuck there, we need to restart the executor to recovery.
|
||||
ProcedureTestingUtility.restart(procExecutor);
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
|
@ -114,12 +114,24 @@ public class TestProcedureBypass {
|
|||
.size() > 0);
|
||||
SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
|
||||
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
|
||||
assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false));
|
||||
assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false));
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBypassingProcedureWithParentRecursive() throws Exception {
|
||||
final RootProcedure proc = new RootProcedure();
|
||||
long rootId = procExecutor.submitProcedure(proc);
|
||||
htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
|
||||
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
|
||||
.size() > 0);
|
||||
SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
|
||||
.filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
|
||||
assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true));
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
|
@ -179,7 +191,4 @@ public class TestProcedureBypass {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -99,6 +99,7 @@ message MergeTableRegionsResponse {
|
|||
|
||||
message AssignRegionRequest {
|
||||
required RegionSpecifier region = 1;
|
||||
optional bool override = 2 [default = false];
|
||||
}
|
||||
|
||||
message AssignRegionResponse {
|
||||
|
@ -1005,6 +1006,7 @@ message SetTableStateInMetaRequest {
|
|||
// Region at a time.
|
||||
message AssignsRequest {
|
||||
repeated RegionSpecifier region = 1;
|
||||
optional bool override = 2 [default = false];
|
||||
}
|
||||
|
||||
/** Like Admin's AssignRegionResponse except it can
|
||||
|
@ -1019,6 +1021,7 @@ message AssignsResponse {
|
|||
*/
|
||||
message UnassignsRequest {
|
||||
repeated RegionSpecifier region = 1;
|
||||
optional bool override = 2 [default = false];
|
||||
}
|
||||
|
||||
/** Like Admin's UnassignRegionResponse except it can
|
||||
|
@ -1031,7 +1034,8 @@ message UnassignsResponse {
|
|||
message BypassProcedureRequest {
|
||||
repeated uint64 proc_id = 1;
|
||||
optional uint64 waitTime = 2; // wait time in ms to acquire lock on a procedure
|
||||
optional bool force = 3; // if true, procedure is marked for bypass even if its executing
|
||||
optional bool override = 3 [default = false]; // if true, procedure is marked for bypass even if its executing
|
||||
optional bool recursive = 4;
|
||||
}
|
||||
|
||||
message BypassProcedureResponse {
|
||||
|
|
|
@ -330,6 +330,7 @@ message AssignRegionStateData {
|
|||
optional ServerName target_server = 4;
|
||||
// Current attempt index used for expotential backoff when stuck
|
||||
optional int32 attempt = 5;
|
||||
optional bool override = 6 [default = false];
|
||||
}
|
||||
|
||||
message UnassignRegionStateData {
|
||||
|
@ -341,6 +342,7 @@ message UnassignRegionStateData {
|
|||
// This is the server currently hosting the Region, the
|
||||
// server we will send the unassign rpc too.
|
||||
optional ServerName hosting_server = 5;
|
||||
// We hijacked an old param named 'force' and use it as 'override'.
|
||||
optional bool force = 4 [default = false];
|
||||
optional bool remove_after_unassigning = 6 [default = false];
|
||||
// Current attempt index used for expotential backoff when stuck
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.ServerListener;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||
|
@ -873,7 +874,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
|
||||
if (result != null && result.isFailed()) {
|
||||
throw new IOException("Failed to create group table. " +
|
||||
result.getException().unwrapRemoteIOException());
|
||||
MasterProcedureUtil.unwrapRemoteIOException(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||
|
@ -569,6 +570,8 @@ public class TestRSGroupBasedLoadBalancer {
|
|||
Mockito.when(services.getTableDescriptors()).thenReturn(tds);
|
||||
AssignmentManager am = Mockito.mock(AssignmentManager.class);
|
||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||
RegionStates rss = Mockito.mock(RegionStates.class);
|
||||
Mockito.when(am.getRegionStates()).thenReturn(rss);
|
||||
return services;
|
||||
}
|
||||
|
||||
|
|
|
@ -978,7 +978,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
|
||||
// if it is down. It may take a while to come online. So, wait here until meta if for sure
|
||||
// available. Thats what waitUntilMetaOnline does.
|
||||
if (!waitUntilMetaOnline()) {
|
||||
if (!waitForMetaOnline()) {
|
||||
return;
|
||||
}
|
||||
this.assignmentManager.joinCluster();
|
||||
|
@ -1010,7 +1010,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Here we expect hbase:namespace to be online. See inside initClusterSchemaService.
|
||||
// TODO: Fix this. Namespace is a pain being a sort-of system table. Fold it in to hbase:meta.
|
||||
// isNamespace does like isMeta and waits until namespace is onlined before allowing progress.
|
||||
if (!waitUntilNamespaceOnline()) {
|
||||
if (!waitForNamespaceOnline()) {
|
||||
return;
|
||||
}
|
||||
status.setStatus("Starting cluster schema service");
|
||||
|
@ -1094,7 +1094,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* and we will hold here until operator intervention.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public boolean waitUntilMetaOnline() throws InterruptedException {
|
||||
public boolean waitForMetaOnline() throws InterruptedException {
|
||||
return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
}
|
||||
|
||||
|
@ -1135,7 +1135,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* @return True if namespace table is up/online.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public boolean waitUntilNamespaceOnline() throws InterruptedException {
|
||||
public boolean waitForNamespaceOnline() throws InterruptedException {
|
||||
List<RegionInfo> ris = this.assignmentManager.getRegionStates().
|
||||
getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
|
||||
if (ris.isEmpty()) {
|
||||
|
|
|
@ -29,7 +29,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
|
@ -595,7 +594,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
BalanceRequest request) throws ServiceException {
|
||||
try {
|
||||
return BalanceResponse.newBuilder().setBalancerRan(master.balance(
|
||||
request.hasForce() ? request.getForce() : false)).build();
|
||||
request.hasForce()? request.getForce(): false)).build();
|
||||
} catch (IOException ex) {
|
||||
throw new ServiceException(ex);
|
||||
}
|
||||
|
@ -1181,7 +1180,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
if (executor.isFinished(procId)) {
|
||||
builder.setState(GetProcedureResultResponse.State.FINISHED);
|
||||
if (result.isFailed()) {
|
||||
IOException exception = result.getException().unwrapRemoteIOException();
|
||||
IOException exception =
|
||||
MasterProcedureUtil.unwrapRemoteIOException(result);
|
||||
builder.setException(ForeignExceptionUtil.toProtoForeignException(exception));
|
||||
}
|
||||
byte[] resultData = result.getResult();
|
||||
|
@ -2335,14 +2335,15 @@ public class MasterRpcServices extends RSRpcServices
|
|||
* Submit the Procedure that gets created by <code>f</code>
|
||||
* @return pid of the submitted Procedure.
|
||||
*/
|
||||
private long submitProcedure(HBaseProtos.RegionSpecifier rs, Function<RegionInfo, Procedure> f)
|
||||
private long submitProcedure(HBaseProtos.RegionSpecifier rs, boolean override,
|
||||
BiFunction<RegionInfo, Boolean, Procedure> f)
|
||||
throws UnknownRegionException {
|
||||
RegionInfo ri = getRegionInfo(rs);
|
||||
long pid = Procedure.NO_PROC_ID;
|
||||
if (ri == null) {
|
||||
LOG.warn("No RegionInfo found to match {}", rs);
|
||||
} else {
|
||||
pid = this.master.getMasterProcedureExecutor().submitProcedure(f.apply(ri));
|
||||
pid = this.master.getMasterProcedureExecutor().submitProcedure(f.apply(ri, override));
|
||||
}
|
||||
return pid;
|
||||
}
|
||||
|
@ -2362,9 +2363,10 @@ public class MasterRpcServices extends RSRpcServices
|
|||
MasterProtos.AssignsResponse.Builder responseBuilder =
|
||||
MasterProtos.AssignsResponse.newBuilder();
|
||||
try {
|
||||
boolean override = request.getOverride();
|
||||
for (HBaseProtos.RegionSpecifier rs: request.getRegionList()) {
|
||||
long pid = submitProcedure(rs,
|
||||
r -> this.master.getAssignmentManager().createAssignProcedure(r));
|
||||
long pid = submitProcedure(rs, override,
|
||||
(r, b) -> this.master.getAssignmentManager().createAssignProcedure(r, b));
|
||||
responseBuilder.addPid(pid);
|
||||
}
|
||||
return responseBuilder.build();
|
||||
|
@ -2388,9 +2390,10 @@ public class MasterRpcServices extends RSRpcServices
|
|||
MasterProtos.UnassignsResponse.Builder responseBuilder =
|
||||
MasterProtos.UnassignsResponse.newBuilder();
|
||||
try {
|
||||
boolean override = request.getOverride();
|
||||
for (HBaseProtos.RegionSpecifier rs: request.getRegionList()) {
|
||||
long pid = submitProcedure(rs,
|
||||
ri -> this.master.getAssignmentManager().createUnassignProcedure(ri));
|
||||
long pid = submitProcedure(rs, override,
|
||||
(r, b) -> this.master.getAssignmentManager().createUnassignProcedure(r, b));
|
||||
responseBuilder.addPid(pid);
|
||||
}
|
||||
return responseBuilder.build();
|
||||
|
@ -2416,7 +2419,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
try {
|
||||
List<Boolean> ret =
|
||||
master.getMasterProcedureExecutor().bypassProcedure(request.getProcIdList(),
|
||||
request.getWaitTime(), request.getForce());
|
||||
request.getWaitTime(), request.getOverride(), request.getRecursive());
|
||||
return MasterProtos.BypassProcedureResponse.newBuilder().addAllBypassed(ret).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
|
|
@ -99,12 +99,16 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
public AssignProcedure(final RegionInfo regionInfo) {
|
||||
super(regionInfo);
|
||||
this.targetServer = null;
|
||||
this(regionInfo, null);
|
||||
}
|
||||
|
||||
public AssignProcedure(final RegionInfo regionInfo, final ServerName destinationServer) {
|
||||
super(regionInfo);
|
||||
this(regionInfo, destinationServer, false);
|
||||
}
|
||||
|
||||
public AssignProcedure(final RegionInfo regionInfo, final ServerName destinationServer,
|
||||
boolean override) {
|
||||
super(regionInfo, override);
|
||||
this.targetServer = destinationServer;
|
||||
}
|
||||
|
||||
|
@ -138,6 +142,9 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
if (getAttempt() > 0) {
|
||||
state.setAttempt(getAttempt());
|
||||
}
|
||||
if (isOverride()) {
|
||||
state.setOverride(isOverride());
|
||||
}
|
||||
serializer.serialize(state.build());
|
||||
}
|
||||
|
||||
|
@ -148,6 +155,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
setTransitionState(state.getTransitionState());
|
||||
setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo()));
|
||||
forceNewPlan = state.getForceNewPlan();
|
||||
setOverride(state.getOverride());
|
||||
if (state.hasTargetServer()) {
|
||||
this.targetServer = ProtobufUtil.toServerName(state.getTargetServer());
|
||||
}
|
||||
|
|
|
@ -677,26 +677,37 @@ public class AssignmentManager implements ServerListener {
|
|||
* Called by things like DisableTableProcedure to get a list of UnassignProcedure
|
||||
* to unassign the regions of the table.
|
||||
*/
|
||||
public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
|
||||
return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
|
||||
public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
|
||||
return createAssignProcedure(regionInfo, null, false);
|
||||
}
|
||||
|
||||
public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
|
||||
AssignProcedure proc = new AssignProcedure(regionInfo);
|
||||
proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
|
||||
return proc;
|
||||
public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) {
|
||||
return createAssignProcedure(regionInfo, null, override);
|
||||
}
|
||||
|
||||
public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
|
||||
final ServerName targetServer) {
|
||||
AssignProcedure proc = new AssignProcedure(regionInfo, targetServer);
|
||||
ServerName targetServer) {
|
||||
return createAssignProcedure(regionInfo, targetServer, false);
|
||||
}
|
||||
|
||||
public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
|
||||
final ServerName targetServer, boolean override) {
|
||||
AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override);
|
||||
proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
|
||||
return proc;
|
||||
}
|
||||
|
||||
public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) {
|
||||
return createUnassignProcedure(regionInfo, null, false);
|
||||
}
|
||||
|
||||
public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
|
||||
boolean override) {
|
||||
return createUnassignProcedure(regionInfo, null, override);
|
||||
}
|
||||
|
||||
public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
|
||||
return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
|
||||
}
|
||||
|
||||
UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
|
||||
|
|
|
@ -111,6 +111,12 @@ public abstract class RegionTransitionProcedure
|
|||
*/
|
||||
private RegionInfo regionInfo;
|
||||
|
||||
/**
|
||||
* this data member must also be persisted.
|
||||
* @see #regionInfo
|
||||
*/
|
||||
private boolean override;
|
||||
|
||||
/**
|
||||
* Like {@link #regionInfo}, the expectation is that subclasses persist the value of this
|
||||
* data member. It is used doing backoff when Procedure gets stuck.
|
||||
|
@ -120,8 +126,9 @@ public abstract class RegionTransitionProcedure
|
|||
// Required by the Procedure framework to create the procedure on replay
|
||||
public RegionTransitionProcedure() {}
|
||||
|
||||
public RegionTransitionProcedure(final RegionInfo regionInfo) {
|
||||
public RegionTransitionProcedure(final RegionInfo regionInfo, boolean override) {
|
||||
this.regionInfo = regionInfo;
|
||||
this.override = override;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -134,12 +141,24 @@ public abstract class RegionTransitionProcedure
|
|||
* {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
|
||||
* subclasses will persist `regioninfo` in their
|
||||
* {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `regionInfo` on
|
||||
* deserialization by calling.
|
||||
* deserialization by calling this.
|
||||
*/
|
||||
protected void setRegionInfo(final RegionInfo regionInfo) {
|
||||
this.regionInfo = regionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* This setter is for subclasses to call in their
|
||||
* {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
|
||||
* subclasses will persist `override` in their
|
||||
* {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `override` on
|
||||
* deserialization by calling this.
|
||||
*/
|
||||
protected void setOverride(boolean override) {
|
||||
this.override = override;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This setter is for subclasses to call in their
|
||||
* {@link #deserializeStateData(ProcedureStateSerializer)} method.
|
||||
|
@ -170,6 +189,11 @@ public abstract class RegionTransitionProcedure
|
|||
sb.append(getTableName());
|
||||
sb.append(", region=");
|
||||
sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName());
|
||||
if (isOverride()) {
|
||||
// Only log if set.
|
||||
sb.append(", override=");
|
||||
sb.append(isOverride());
|
||||
}
|
||||
}
|
||||
|
||||
public RegionStateNode getRegionState(final MasterProcedureEnv env) {
|
||||
|
@ -308,13 +332,20 @@ public abstract class RegionTransitionProcedure
|
|||
final AssignmentManager am = env.getAssignmentManager();
|
||||
final RegionStateNode regionNode = getRegionState(env);
|
||||
if (!am.addRegionInTransition(regionNode, this)) {
|
||||
String msg = String.format(
|
||||
"There is already another procedure running on this region this=%s owner=%s",
|
||||
this, regionNode.getProcedure());
|
||||
LOG.warn(msg + " " + this + "; " + regionNode.toShortString());
|
||||
if (this.isOverride()) {
|
||||
LOG.info("{} owned by pid={}, OVERRIDDEN by 'this' (pid={}, override=true).",
|
||||
regionNode.getRegionInfo().getEncodedName(),
|
||||
regionNode.getProcedure().getProcId(), getProcId());
|
||||
regionNode.unsetProcedure(regionNode.getProcedure());
|
||||
} else {
|
||||
String msg = String.format("%s owned by pid=%d, CANNOT run 'this' (pid=%d).",
|
||||
regionNode.getRegionInfo().getEncodedName(),
|
||||
regionNode.getProcedure().getProcId(), getProcId());
|
||||
LOG.warn(msg);
|
||||
setAbortFailure(getClass().getSimpleName(), msg);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
try {
|
||||
boolean retry;
|
||||
do {
|
||||
|
@ -425,8 +456,12 @@ public abstract class RegionTransitionProcedure
|
|||
// TODO: Revisit this and move it to the executor
|
||||
if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
|
||||
try {
|
||||
LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " +
|
||||
// Enable TRACE on this class to see lock dump. Can be really large when cluster is big
|
||||
// or big tables being enabled/disabled.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} pid={} {}", LockState.LOCK_EVENT_WAIT, getProcId(),
|
||||
env.getProcedureScheduler().dumpLocks());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore, just for logging
|
||||
}
|
||||
|
@ -469,4 +504,23 @@ public abstract class RegionTransitionProcedure
|
|||
// should not be called for region operation until we modified the open/close region procedure
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void bypass(MasterProcedureEnv env) {
|
||||
// This override is just so I can write a note on how bypass is done in
|
||||
// RTP. For RTP procedures -- i.e. assign/unassign -- if bypass is called,
|
||||
// we intentionally do NOT cleanup our state. We leave a reference to the
|
||||
// bypassed Procedure in the RegionStateNode. Doing this makes it so the
|
||||
// RSN is in an odd state. The bypassed Procedure is finished but no one
|
||||
// else can make progress on this RSN entity (see the #execute above where
|
||||
// we check the RSN to see if an already registered procedure and if so,
|
||||
// we exit without proceeding). This is done to intentionally block
|
||||
// subsequent Procedures from running. Only a Procedure with the 'override' flag
|
||||
// set can overwrite the RSN and make progress.
|
||||
super.bypass(env);
|
||||
}
|
||||
|
||||
boolean isOverride() {
|
||||
return this.override;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,10 +84,6 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
*/
|
||||
protected volatile ServerName destinationServer;
|
||||
|
||||
// TODO: should this be in a reassign procedure?
|
||||
// ...and keep unassign for 'disable' case?
|
||||
private boolean force;
|
||||
|
||||
/**
|
||||
* Whether deleting the region from in-memory states after unassigning the region.
|
||||
*/
|
||||
|
@ -109,12 +105,11 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
}
|
||||
|
||||
public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer,
|
||||
final ServerName destinationServer, final boolean force,
|
||||
final ServerName destinationServer, final boolean override,
|
||||
final boolean removeAfterUnassigning) {
|
||||
super(regionInfo);
|
||||
super(regionInfo, override);
|
||||
this.hostingServer = hostingServer;
|
||||
this.destinationServer = destinationServer;
|
||||
this.force = force;
|
||||
this.removeAfterUnassigning = removeAfterUnassigning;
|
||||
|
||||
// we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request
|
||||
|
@ -147,7 +142,7 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
if (this.destinationServer != null) {
|
||||
state.setDestinationServer(ProtobufUtil.toServerName(destinationServer));
|
||||
}
|
||||
if (force) {
|
||||
if (isOverride()) {
|
||||
state.setForce(true);
|
||||
}
|
||||
if (removeAfterUnassigning) {
|
||||
|
@ -167,7 +162,8 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
setTransitionState(state.getTransitionState());
|
||||
setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo()));
|
||||
this.hostingServer = ProtobufUtil.toServerName(state.getHostingServer());
|
||||
force = state.getForce();
|
||||
// The 'force' flag is the override flag in unassign.
|
||||
setOverride(state.getForce());
|
||||
if (state.hasDestinationServer()) {
|
||||
this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
|
||||
}
|
||||
|
|
|
@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
||||
|
@ -1456,12 +1458,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
ServerName oldServerName = entry.getValue();
|
||||
// In the current set of regions even if one has region replica let us go with
|
||||
// getting the entire snapshot
|
||||
if (this.services != null && this.services.getAssignmentManager() != null) { // for tests
|
||||
if (!hasRegionReplica && this.services.getAssignmentManager().getRegionStates()
|
||||
.isReplicaAvailableForRegion(region)) {
|
||||
if (this.services != null) { // for tests
|
||||
AssignmentManager am = this.services.getAssignmentManager();
|
||||
if (am != null) {
|
||||
RegionStates rss = am.getRegionStates();
|
||||
if (!hasRegionReplica && rss.isReplicaAvailableForRegion(region)) {
|
||||
hasRegionReplica = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
List<ServerName> localServers = new ArrayList<>();
|
||||
if (oldServerName != null) {
|
||||
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
|
||||
|
|
|
@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.NonceKey;
|
||||
|
@ -176,4 +179,17 @@ public final class MasterProcedureUtil {
|
|||
public static int getServerPriority(ServerProcedureInterface proc) {
|
||||
return proc.hasMetaTableRegion() ? 100 : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a version of unwrapRemoteIOException that can do DoNotRetryIOE.
|
||||
* We need to throw DNRIOE to clients if a failed Procedure else they will
|
||||
* keep trying. The default proc.getException().unwrapRemoteException
|
||||
* doesn't have access to DNRIOE from the procedure2 module.
|
||||
*/
|
||||
public static IOException unwrapRemoteIOException(Procedure proc) {
|
||||
Exception e = proc.getException().unwrapRemoteException();
|
||||
// Do not retry ProcedureExceptions!
|
||||
return (e instanceof ProcedureException)? new DoNotRetryIOException(e):
|
||||
proc.getException().unwrapRemoteIOException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,8 @@ public class ProcedureDescriber {
|
|||
description.put("LAST_UPDATE", new Date(proc.getLastUpdate()));
|
||||
|
||||
if (proc.isFailed()) {
|
||||
description.put("ERRORS", proc.getException().unwrapRemoteIOException().getMessage());
|
||||
description.put("ERRORS",
|
||||
MasterProcedureUtil.unwrapRemoteIOException(proc).getMessage());
|
||||
}
|
||||
description.put("PARAMETERS", parametersToObject(proc));
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ public abstract class ProcedurePrepareLatch {
|
|||
@Override
|
||||
protected void countDown(final Procedure proc) {
|
||||
if (proc.hasException()) {
|
||||
exception = proc.getException().unwrapRemoteIOException();
|
||||
exception = MasterProcedureUtil.unwrapRemoteIOException(proc);
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
|
@ -160,9 +160,10 @@ public final class ProcedureSyncWait {
|
|||
throw new IOException("The Master is Aborting");
|
||||
}
|
||||
|
||||
if (proc.hasException()) {
|
||||
// If the procedure fails, we should always have an exception captured. Throw it.
|
||||
throw proc.getException().unwrapRemoteIOException();
|
||||
// Needs to be an IOE to get out of here.
|
||||
if (proc.hasException()) {
|
||||
throw MasterProcedureUtil.unwrapRemoteIOException(proc);
|
||||
} else {
|
||||
return proc.getResult();
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ public class TestMetaTableAccessor {
|
|||
@Test
|
||||
public void testIsMetaWhenAllHealthy() throws InterruptedException {
|
||||
HMaster m = UTIL.getMiniHBaseCluster().getMaster();
|
||||
assertTrue(m.waitUntilMetaOnline());
|
||||
assertTrue(m.waitForMetaOnline());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -117,7 +117,7 @@ public class TestMetaTableAccessor {
|
|||
int index = UTIL.getMiniHBaseCluster().getServerWithMeta();
|
||||
HRegionServer rsWithMeta = UTIL.getMiniHBaseCluster().getRegionServer(index);
|
||||
rsWithMeta.abort("TESTING");
|
||||
assertTrue(m.waitUntilMetaOnline());
|
||||
assertTrue(m.waitForMetaOnline());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -116,7 +116,8 @@ public class TestHbck {
|
|||
|
||||
//bypass the procedure
|
||||
List<Long> pids = Arrays.<Long>asList(procId);
|
||||
List<Boolean> results = TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false);
|
||||
List<Boolean> results =
|
||||
TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
|
||||
assertTrue("Failed to by pass procedure!", results.get(0));
|
||||
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Tests bypass on a region assign/unassign
|
||||
*/
|
||||
@Category({LargeTests.class})
|
||||
public class TestRegionBypass {
|
||||
private final static Logger LOG = LoggerFactory.getLogger(TestRegionBypass.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionBypass.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private TableName tableName;
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopCluster() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
this.tableName = TableName.valueOf(this.name.getMethodName());
|
||||
// Create a table. Has one region at least.
|
||||
TEST_UTIL.createTable(this.tableName, Bytes.toBytes("cf"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBypass() throws IOException {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
List<RegionInfo> regions = admin.getRegions(this.tableName);
|
||||
for (RegionInfo ri: regions) {
|
||||
admin.unassign(ri.getRegionName(), false);
|
||||
}
|
||||
List<Long> pids = new ArrayList<>(regions.size());
|
||||
for (RegionInfo ri: regions) {
|
||||
Procedure<MasterProcedureEnv> p = new StallingAssignProcedure(ri);
|
||||
pids.add(TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().
|
||||
submitProcedure(p));
|
||||
}
|
||||
for (Long pid: pids) {
|
||||
while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().isStarted(pid)) {
|
||||
Thread.currentThread().yield();
|
||||
}
|
||||
}
|
||||
// Call bypass on all. We should be stuck in the dispatch at this stage.
|
||||
List<Procedure<MasterProcedureEnv>> ps =
|
||||
TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures();
|
||||
for (Procedure<MasterProcedureEnv> p: ps) {
|
||||
if (p instanceof StallingAssignProcedure) {
|
||||
List<Boolean> bs = TEST_UTIL.getHbck().
|
||||
bypassProcedure(Arrays.<Long>asList(p.getProcId()), 0, false, false);
|
||||
for (Boolean b: bs) {
|
||||
LOG.info("BYPASSED {} {}", p.getProcId(), b);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Countdown the latch so its not hanging out.
|
||||
for (Procedure<MasterProcedureEnv> p: ps) {
|
||||
if (p instanceof StallingAssignProcedure) {
|
||||
((StallingAssignProcedure)p).latch.countDown();
|
||||
}
|
||||
}
|
||||
// Try and assign WITHOUT override flag. Should fail!.
|
||||
for (RegionInfo ri: regions) {
|
||||
try {
|
||||
admin.assign(ri.getRegionName());
|
||||
} catch (Throwable dnrioe) {
|
||||
// Expected
|
||||
LOG.info("Expected {}", dnrioe);
|
||||
}
|
||||
}
|
||||
while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().
|
||||
getActiveProcIds().isEmpty()) {
|
||||
Thread.currentThread().yield();
|
||||
}
|
||||
// Now assign with the override flag.
|
||||
for (RegionInfo ri: regions) {
|
||||
TEST_UTIL.getHbck().assigns(Arrays.<String>asList(ri.getEncodedName()), true);
|
||||
}
|
||||
while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().
|
||||
getActiveProcIds().isEmpty()) {
|
||||
Thread.currentThread().yield();
|
||||
}
|
||||
for (RegionInfo ri: regions) {
|
||||
assertTrue(ri.toString(), TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().
|
||||
getRegionStates().isRegionOnline(ri));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An AssignProcedure that Stalls just before the finish.
|
||||
*/
|
||||
public static class StallingAssignProcedure extends AssignProcedure {
|
||||
public final CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
public StallingAssignProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
public StallingAssignProcedure(RegionInfo regionInfo) {
|
||||
super(regionInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
void setTransitionState(MasterProcedureProtos.RegionTransitionState state) {
|
||||
if (state == MasterProcedureProtos.RegionTransitionState.REGION_TRANSITION_DISPATCH) {
|
||||
try {
|
||||
LOG.info("LATCH2 {}", this.latch.getCount());
|
||||
this.latch.await();
|
||||
LOG.info("LATCH3 {}", this.latch.getCount());
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else if (state == MasterProcedureProtos.RegionTransitionState.REGION_TRANSITION_QUEUE) {
|
||||
// Set latch.
|
||||
LOG.info("LATCH1 {}", this.latch.getCount());
|
||||
this.latch.countDown();
|
||||
}
|
||||
super.setTransitionState(state);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,12 +31,11 @@ EOF
|
|||
end
|
||||
|
||||
def command
|
||||
formatter.header(%w[Id Name State Submitted_Time Last_Update Parameters])
|
||||
|
||||
formatter.header(%w[PID Name State Submitted Last_Update Parameters])
|
||||
list = JSON.parse(admin.list_procedures)
|
||||
list.each do |proc|
|
||||
submitted_time = Time.at(Integer(proc['submittedTime']) / 1000).to_s
|
||||
last_update = Time.at(Integer(proc['lastUpdate']) / 1000).to_s
|
||||
submitted_time = Time.at(Integer(proc['submittedTime'])/1000).to_s
|
||||
last_update = Time.at(Integer(proc['lastUpdate'])/1000).to_s
|
||||
formatter.row([proc['procId'], proc['className'], proc['state'],
|
||||
submitted_time, last_update, proc['stateMessage']])
|
||||
end
|
||||
|
|
|
@ -39,5 +39,4 @@ public class TestShell extends AbstractTestShell {
|
|||
// Start all ruby tests
|
||||
jruby.runScriptlet(PathType.ABSOLUTE, "src/test/ruby/tests_runner.rb");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -85,9 +85,9 @@ class ShellTest < Test::Unit::TestCase
|
|||
|
||||
define_test "Shell::Shell interactive mode should not throw" do
|
||||
# incorrect number of arguments
|
||||
@shell.command('create', 'foo')
|
||||
@shell.command('create', 'foo', 'family_1')
|
||||
@shell.command('create', 'nothrow_table')
|
||||
@shell.command('create', 'nothrow_table', 'family_1')
|
||||
# create a table that exists
|
||||
@shell.command('create', 'foo', 'family_1')
|
||||
@shell.command('create', 'nothrow_table', 'family_1')
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue