HBASE-21075 Confirm that we can (rolling) upgrade from 2.0.x and 2.1.x to 2.2.x after HBASE-20881
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
0cd23c3dda
commit
63f718974b
|
@ -86,6 +86,12 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
"hbase.procedure.worker.keep.alive.time.msec";
|
"hbase.procedure.worker.keep.alive.time.msec";
|
||||||
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
|
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
|
||||||
|
|
||||||
|
// Enable this flag if you want to upgrade to 2.2+, there are some incompatible changes on how we
|
||||||
|
// assign or unassign a region, so we need to make sure all these procedures have been finished
|
||||||
|
// before we start the master with new code. See HBASE-20881 and HBASE-21075 for more details.
|
||||||
|
public static final String UPGRADE_TO_2_2 = "hbase.procedure.upgrade-to-2-2";
|
||||||
|
private static final boolean DEFAULT_UPGRADE_TO_2_2 = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
|
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
|
||||||
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
|
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
|
||||||
|
@ -363,6 +369,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// execution of the same procedure.
|
// execution of the same procedure.
|
||||||
private final IdLock procExecutionLock = new IdLock();
|
private final IdLock procExecutionLock = new IdLock();
|
||||||
|
|
||||||
|
private final boolean upgradeTo2_2;
|
||||||
|
|
||||||
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
|
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
|
||||||
final ProcedureStore store) {
|
final ProcedureStore store) {
|
||||||
this(conf, environment, store, new SimpleProcedureScheduler());
|
this(conf, environment, store, new SimpleProcedureScheduler());
|
||||||
|
@ -394,6 +402,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
|
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
|
||||||
|
this.upgradeTo2_2 = conf.getBoolean(UPGRADE_TO_2_2, DEFAULT_UPGRADE_TO_2_2);
|
||||||
refreshConfiguration(conf);
|
refreshConfiguration(conf);
|
||||||
store.registerListener(new ProcedureStoreListener() {
|
store.registerListener(new ProcedureStoreListener() {
|
||||||
|
|
||||||
|
@ -721,6 +730,25 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// Internal chores
|
// Internal chores
|
||||||
timeoutExecutor.add(new WorkerMonitor());
|
timeoutExecutor.add(new WorkerMonitor());
|
||||||
|
|
||||||
|
if (upgradeTo2_2) {
|
||||||
|
timeoutExecutor.add(new InlineChore() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (procedures.isEmpty()) {
|
||||||
|
LOG.info("UPGRADE OK: All existed procedures have been finished, quit...");
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getTimeoutInterval() {
|
||||||
|
// check every 5 seconds to see if we can quit
|
||||||
|
return 5000;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Add completed cleaner chore
|
// Add completed cleaner chore
|
||||||
addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
|
addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
|
||||||
}
|
}
|
||||||
|
@ -1203,10 +1231,9 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Stored " + Arrays.toString(procs));
|
LOG.debug("Stored " + Arrays.toString(procs));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the procedure to the executor
|
// Add the procedure to the executor
|
||||||
for (int i = 0; i < procs.length; ++i) {
|
for (Procedure<TEnvironment> proc : procs) {
|
||||||
pushProcedure(procs[i]);
|
pushProcedure(proc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1220,7 +1247,12 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long pushProcedure(Procedure<TEnvironment> proc) {
|
private long pushProcedure(Procedure<TEnvironment> proc) {
|
||||||
final long currentProcId = proc.getProcId();
|
long currentProcId = proc.getProcId();
|
||||||
|
// If we are going to upgrade to 2.2+, and this is not a sub procedure, do not push it to
|
||||||
|
// scheduler. After we finish all the ongoing procedures, the master will quit.
|
||||||
|
if (upgradeTo2_2 && !proc.hasParent()) {
|
||||||
|
return currentProcId;
|
||||||
|
}
|
||||||
|
|
||||||
// Update metrics on start of a procedure
|
// Update metrics on start of a procedure
|
||||||
proc.updateMetricsOnSubmit(getEnvironment());
|
proc.updateMetricsOnSubmit(getEnvironment());
|
||||||
|
|
Loading…
Reference in New Issue