HBASE-20708 Remove the usage of RecoverMetaProcedure in master startup

This commit is contained in:
zhangduo 2018-06-19 15:02:10 +08:00
parent b336da925a
commit 6dbbd78aa0
45 changed files with 610 additions and 746 deletions

View File

@ -1346,9 +1346,17 @@ public class MetaTableAccessor {
*/ */
public static void putsToMetaTable(final Connection connection, final List<Put> ps) public static void putsToMetaTable(final Connection connection, final List<Put> ps)
throws IOException { throws IOException {
if (ps.isEmpty()) {
return;
}
try (Table t = getMetaHTable(connection)) { try (Table t = getMetaHTable(connection)) {
debugLogMutations(ps); debugLogMutations(ps);
t.put(ps); // the implementation for putting a single Put is much simpler so here we do a check first.
if (ps.size() == 1) {
t.put(ps.get(0));
} else {
t.put(ps);
}
} }
} }

View File

@ -511,21 +511,16 @@ public class ProcedureExecutor<TEnvironment> {
} }
/** /**
* Start the procedure executor. * Initialize the procedure executor, but do not start workers. We will start them later.
* It calls ProcedureStore.recoverLease() and ProcedureStore.load() to * <p/>
* recover the lease, and ensure a single executor, and start the procedure * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
* replay to resume and recover the previous pending and in-progress perocedures. * ensure a single executor, and start the procedure replay to resume and recover the previous
* * pending and in-progress procedures.
* @param numThreads number of threads available for procedure execution. * @param numThreads number of threads available for procedure execution.
* @param abortOnCorruption true if you want to abort your service in case * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
* a corrupted procedure is found on replay. otherwise false. * is found on replay. otherwise false.
*/ */
public void start(int numThreads, boolean abortOnCorruption) throws IOException { public void init(int numThreads, boolean abortOnCorruption) throws IOException {
if (!running.compareAndSet(false, true)) {
LOG.warn("Already running");
return;
}
// We have numThreads executor + one timer thread used for timing out // We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures. // procedures and triggering periodic procedures.
this.corePoolSize = numThreads; this.corePoolSize = numThreads;
@ -546,11 +541,11 @@ public class ProcedureExecutor<TEnvironment> {
long st, et; long st, et;
// Acquire the store lease. // Acquire the store lease.
st = EnvironmentEdgeManager.currentTime(); st = System.nanoTime();
store.recoverLease(); store.recoverLease();
et = EnvironmentEdgeManager.currentTime(); et = System.nanoTime();
LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(et - st)); StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
// start the procedure scheduler // start the procedure scheduler
scheduler.start(); scheduler.start();
@ -560,12 +555,21 @@ public class ProcedureExecutor<TEnvironment> {
// The first one will make sure that we have the latest id, // The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures. // so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures. // The second step will do the actual load of old procedures.
st = EnvironmentEdgeManager.currentTime(); st = System.nanoTime();
load(abortOnCorruption); load(abortOnCorruption);
et = EnvironmentEdgeManager.currentTime(); et = System.nanoTime();
LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(et - st)); StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
}
/**
* Start the workers.
*/
public void startWorkers() throws IOException {
if (!running.compareAndSet(false, true)) {
LOG.warn("Already running");
return;
}
// Start the executors. Here we must have the lastProcId set. // Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size()); LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start(); timeoutExecutor.start();
@ -861,7 +865,6 @@ public class ProcedureExecutor<TEnvironment> {
justification = "FindBugs is blind to the check-for-null") justification = "FindBugs is blind to the check-for-null")
public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(isRunning(), "executor not running");
prepareProcedure(proc); prepareProcedure(proc);
@ -895,7 +898,6 @@ public class ProcedureExecutor<TEnvironment> {
// TODO: Do we need to take nonces here? // TODO: Do we need to take nonces here?
public void submitProcedures(final Procedure[] procs) { public void submitProcedures(final Procedure[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(isRunning(), "executor not running");
if (procs == null || procs.length <= 0) { if (procs == null || procs.length <= 0) {
return; return;
} }
@ -919,7 +921,6 @@ public class ProcedureExecutor<TEnvironment> {
private Procedure prepareProcedure(final Procedure proc) { private Procedure prepareProcedure(final Procedure proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(isRunning(), "executor not running");
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) { if (this.checkOwnerSet) {
Preconditions.checkArgument(proc.hasOwner(), "missing owner"); Preconditions.checkArgument(proc.hasOwner(), "missing owner");

View File

@ -70,6 +70,12 @@ public class ProcedureTestingUtility {
restart(procExecutor, false, true, null, null); restart(procExecutor, false, true, null, null);
} }
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
boolean abortOnCorruption) throws IOException {
procExecutor.init(numThreads, abortOnCorruption);
procExecutor.startWorkers();
}
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
final Callable<Void> stopAction, final Callable<Void> startAction) final Callable<Void> stopAction, final Callable<Void> startAction)
@ -98,7 +104,7 @@ public class ProcedureTestingUtility {
// re-start // re-start
LOG.info("RESTART - Start"); LOG.info("RESTART - Start");
procStore.start(storeThreads); procStore.start(storeThreads);
procExecutor.start(execThreads, failOnCorrupted); initAndStartWorkers(procExecutor, execThreads, failOnCorrupted);
if (startAction != null) { if (startAction != null) {
startAction.call(); startAction.call();
} }
@ -183,7 +189,7 @@ public class ProcedureTestingUtility {
NoopProcedureStore procStore = new NoopProcedureStore(); NoopProcedureStore procStore = new NoopProcedureStore();
ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
procStore.start(1); procStore.start(1);
procExecutor.start(1, false); initAndStartWorkers(procExecutor, 1, false);
try { try {
return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
} finally { } finally {

View File

@ -66,10 +66,10 @@ public class TestChildProcedures {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -67,9 +67,9 @@ public class TestProcedureEvents {
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procStore.start(1); procStore.start(1);
procExecutor.start(1, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
} }
@After @After

View File

@ -71,9 +71,9 @@ public class TestProcedureExecution {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -71,8 +70,8 @@ public class TestProcedureExecutor {
} }
private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
procExecutor = new ProcedureExecutor(conf, procEnv, procStore); procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
procExecutor.start(numThreads, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
} }
@Test @Test

View File

@ -53,17 +53,16 @@ public class TestProcedureInMemoryChore {
private HBaseCommonTestingUtility htu; private HBaseCommonTestingUtility htu;
@SuppressWarnings("rawtypes")
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility(); htu = new HBaseCommonTestingUtility();
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = new NoopProcedureStore(); procStore = new NoopProcedureStore();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -75,7 +75,7 @@ public class TestProcedureMetrics {
procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -72,10 +72,10 @@ public class TestProcedureNonce {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -76,10 +76,10 @@ public class TestProcedureRecovery {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0; procSleepInterval = 0;
} }

View File

@ -53,7 +53,7 @@ public class TestProcedureReplayOrder {
private static final int NUM_THREADS = 16; private static final int NUM_THREADS = 16;
private ProcedureExecutor<Void> procExecutor; private ProcedureExecutor<TestProcedureEnv> procExecutor;
private TestProcedureEnv procEnv; private TestProcedureEnv procEnv;
private ProcedureStore procStore; private ProcedureStore procStore;
@ -74,9 +74,9 @@ public class TestProcedureReplayOrder {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcedureEnv(); procEnv = new TestProcedureEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procStore.start(NUM_THREADS); procStore.start(NUM_THREADS);
procExecutor.start(1, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
} }
@After @After

View File

@ -60,9 +60,9 @@ public class TestProcedureSuspended {
htu = new HBaseCommonTestingUtility(); htu = new HBaseCommonTestingUtility();
procStore = new NoopProcedureStore(); procStore = new NoopProcedureStore();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -81,9 +81,9 @@ public class TestStateMachineProcedure {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -71,10 +71,10 @@ public class TestYieldProcedures {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procRunnables = new TestScheduler(); procRunnables = new TestScheduler();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procExecutor =
procStore, procRunnables); new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
} }
@After @After

View File

@ -295,15 +295,17 @@ message RecoverMetaStateData {
enum ServerCrashState { enum ServerCrashState {
SERVER_CRASH_START = 1; SERVER_CRASH_START = 1;
SERVER_CRASH_PROCESS_META = 2; SERVER_CRASH_PROCESS_META = 2[deprecated=true];
SERVER_CRASH_GET_REGIONS = 3; SERVER_CRASH_GET_REGIONS = 3;
SERVER_CRASH_NO_SPLIT_LOGS = 4; SERVER_CRASH_NO_SPLIT_LOGS = 4[deprecated=true];
SERVER_CRASH_SPLIT_LOGS = 5; SERVER_CRASH_SPLIT_LOGS = 5;
// Removed SERVER_CRASH_PREPARE_LOG_REPLAY = 6; // Removed SERVER_CRASH_PREPARE_LOG_REPLAY = 6;
// Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7; // Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7;
SERVER_CRASH_ASSIGN = 8; SERVER_CRASH_ASSIGN = 8;
SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_WAIT_ON_ASSIGN = 9;
SERVER_CRASH_HANDLE_RIT2 = 20; SERVER_CRASH_SPLIT_META_LOGS = 10;
SERVER_CRASH_ASSIGN_META = 11;
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
SERVER_CRASH_FINISH = 100; SERVER_CRASH_FINISH = 100;
} }
@ -445,3 +447,10 @@ enum ReopenTableRegionsState {
message ReopenTableRegionsStateData { message ReopenTableRegionsStateData {
required TableName table_name = 1; required TableName table_name = 1;
} }
enum InitMetaState {
INIT_META_ASSIGN_META = 1;
}
message InitMetaStateData {
}

View File

@ -111,17 +111,14 @@ public class CatalogJanitor extends ScheduledChore {
protected void chore() { protected void chore() {
try { try {
AssignmentManager am = this.services.getAssignmentManager(); AssignmentManager am = this.services.getAssignmentManager();
if (this.enabled.get() if (this.enabled.get() && !this.services.isInMaintenanceMode() && am != null &&
&& !this.services.isInMaintenanceMode() am.isMetaLoaded() && !am.hasRegionsInTransition()) {
&& am != null
&& am.isFailoverCleanupDone()
&& !am.hasRegionsInTransition()) {
scan(); scan();
} else { } else {
LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() + LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() +
", maintenanceMode=" + this.services.isInMaintenanceMode() + ", maintenanceMode=" + this.services.isInMaintenanceMode() + ", am=" + am +
", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) + ", metaLoaded=" + (am != null && am.isMetaLoaded()) + ", hasRIT=" +
", hasRIT=" + (am != null && am.hasRegionsInTransition())); (am != null && am.hasRegionsInTransition()));
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed scan of catalog table", e); LOG.warn("Failed scan of catalog table", e);

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -122,13 +124,14 @@ import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
@ -235,7 +238,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class HMaster extends HRegionServer implements MasterServices { public class HMaster extends HRegionServer implements MasterServices {
private static Logger LOG = LoggerFactory.getLogger(HMaster.class.getName()); private static Logger LOG = LoggerFactory.getLogger(HMaster.class);
/** /**
* Protection against zombie master. Started once Master accepts active responsibility and * Protection against zombie master. Started once Master accepts active responsibility and
@ -351,10 +354,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// initialization may have not completed yet. // initialization may have not completed yet.
volatile boolean serviceStarted = false; volatile boolean serviceStarted = false;
// flag set after we complete assignMeta.
private final ProcedureEvent<?> serverCrashProcessingEnabled =
new ProcedureEvent<>("server crash processing");
// Maximum time we should run balancer for // Maximum time we should run balancer for
private final int maxBlancingTime; private final int maxBlancingTime;
// Maximum percent of regions in transition when balancing // Maximum percent of regions in transition when balancing
@ -725,7 +724,8 @@ public class HMaster extends HRegionServer implements MasterServices {
/** /**
* <p> * <p>
* Initialize all ZK based system trackers. * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
* should have already been initialized along with {@link ServerManager}.
* </p> * </p>
* <p> * <p>
* Will be overridden in tests. * Will be overridden in tests.
@ -747,15 +747,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start(); this.splitOrMergeTracker.start();
// Create Assignment Manager
this.assignmentManager = new AssignmentManager(this);
this.assignmentManager.start();
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
this.regionServerTracker.start();
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start(); this.drainingServerTracker.start();
@ -800,18 +793,40 @@ public class HMaster extends HRegionServer implements MasterServices {
/** /**
* Finish initialization of HMaster after becoming the primary master. * Finish initialization of HMaster after becoming the primary master.
* * <p/>
* The startup order is a bit complicated but very important, do not change it unless you know
* what you are doing.
* <ol> * <ol>
* <li>Initialize master components - file system manager, server manager, * <li>Initialize file system based components - file system manager, wal manager, table
* assignment manager, region server tracker, etc</li> * descriptors, etc</li>
* <li>Start necessary service threads - balancer, catalog janior, * <li>Publish cluster id</li>
* executor services, etc</li> * <li>Here comes the most complicated part - initialize server manager, assignment manager and
* <li>Set cluster as UP in ZooKeeper</li> * region server tracker
* <li>Wait for RegionServers to check-in</li> * <ol type='i'>
* <li>Split logs and perform data recovery, if necessary</li> * <li>Create server manager</li>
* <li>Ensure assignment of meta/namespace regions<li> * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
* <li>Handle either fresh cluster start or master failover</li> * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
* server</li>
* <li>Create assignment manager and start it, load the meta region state, but do not load data
* from meta region</li>
* <li>Start region server tracker, construct the online servers set and find out dead servers and
* schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
* scan the wal directory to find out possible live region servers, and the differences between
* these two sets are the dead servers</li>
* </ol> * </ol>
* </li>
* <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
* <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
* procedure executor, etc. Notice that the balancer must be created first as assignment manager
* may use it when assigning regions.</li>
* <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
* <li>Wait for enough region servers to check-in</li>
* <li>Let assignment manager load data from meta and construct region states</li>
* <li>Start all other things such as chore services, etc</li>
* </ol>
* <p/>
* Notice that now we will not schedule a special procedure to make meta online(unless the first
* time where meta has not been created yet), we will rely on SCP to bring meta online.
*/ */
private void finishActiveMasterInitialization(MonitoredTask status) throws IOException, private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
InterruptedException, KeeperException, ReplicationException { InterruptedException, KeeperException, ReplicationException {
@ -849,10 +864,20 @@ public class HMaster extends HRegionServer implements MasterServices {
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.clusterId = clusterId.toString(); this.clusterId = clusterId.toString();
this.serverManager = createServerManager(this);
// This manager is started AFTER hbase:meta is confirmed on line.
// See inside metaBootstrap.recoverMeta(); below. Shouldn't be so cryptic! status.setStatus("Initialze ServerManager and schedule SCP for crash servers");
this.serverManager = createServerManager(this);
createProcedureExecutor();
// Create Assignment Manager
this.assignmentManager = new AssignmentManager(this);
this.assignmentManager.start();
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
this.regionServerTracker.start(
procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure)
.map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir());
// This manager will be started AFTER hbase:meta is confirmed on line.
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients. // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
this.tableStateManager = this.tableStateManager =
@ -888,10 +913,37 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Initializing master coprocessors"); status.setStatus("Initializing master coprocessors");
this.cpHost = new MasterCoprocessorHost(this, this.conf); this.cpHost = new MasterCoprocessorHost(this, this.conf);
status.setStatus("Initializing meta table if this is a new deploy");
InitMetaProcedure initMetaProc = null;
if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO)
.isOffline()) {
Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream()
.filter(p -> p instanceof InitMetaProcedure).findAny();
if (optProc.isPresent()) {
initMetaProc = (InitMetaProcedure) optProc.get();
} else {
// schedule an init meta procedure if meta has not been deployed yet
initMetaProc = new InitMetaProcedure();
procedureExecutor.submitProcedure(initMetaProc);
}
}
if (this.balancer instanceof FavoredNodesPromoter) {
favoredNodesManager = new FavoredNodesManager(this);
}
// initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.initialize();
// start up all service threads. // start up all service threads.
status.setStatus("Initializing master service threads"); status.setStatus("Initializing master service threads");
startServiceThreads(); startServiceThreads();
// wait meta to be initialized after we start procedure executor
if (initMetaProc != null) {
initMetaProc.await();
}
tableStateManager.start();
// Wake up this server to check in // Wake up this server to check in
sleeper.skipSleepCycle(); sleeper.skipSleepCycle();
@ -903,28 +955,11 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.info(Objects.toString(status)); LOG.info(Objects.toString(status));
waitForRegionServers(status); waitForRegionServers(status);
if (this.balancer instanceof FavoredNodesPromoter) {
favoredNodesManager = new FavoredNodesManager(this);
}
//initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.initialize();
// Make sure meta assigned before proceeding.
status.setStatus("Recovering Meta Region");
// Check if master is shutting down because issue initializing regionservers or balancer. // Check if master is shutting down because issue initializing regionservers or balancer.
if (isStopped()) { if (isStopped()) {
return; return;
} }
// Bring up hbase:meta. recoverMeta is a blocking call waiting until hbase:meta is deployed.
// It also starts the TableStateManager.
MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
metaBootstrap.recoverMeta();
//Initialize after meta as it scans meta //Initialize after meta as it scans meta
if (favoredNodesManager != null) { if (favoredNodesManager != null) {
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
@ -933,9 +968,6 @@ public class HMaster extends HRegionServer implements MasterServices {
favoredNodesManager.initialize(snapshotOfRegionAssignment); favoredNodesManager.initialize(snapshotOfRegionAssignment);
} }
status.setStatus("Submitting log splitting work for previously failed region servers");
metaBootstrap.processDeadServers();
// Fix up assignment manager status // Fix up assignment manager status
status.setStatus("Starting assignment manager"); status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster(); this.assignmentManager.joinCluster();
@ -977,6 +1009,7 @@ public class HMaster extends HRegionServer implements MasterServices {
setInitialized(true); setInitialized(true);
assignmentManager.checkIfShouldMoveSystemRegionAsync(); assignmentManager.checkIfShouldMoveSystemRegionAsync();
status.setStatus("Assign meta replicas"); status.setStatus("Assign meta replicas");
MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
metaBootstrap.assignMetaReplicas(); metaBootstrap.assignMetaReplicas();
status.setStatus("Starting quota manager"); status.setStatus("Starting quota manager");
initQuotaManager(); initQuotaManager();
@ -1119,7 +1152,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private void initQuotaManager() throws IOException { private void initQuotaManager() throws IOException {
MasterQuotaManager quotaManager = new MasterQuotaManager(this); MasterQuotaManager quotaManager = new MasterQuotaManager(this);
this.assignmentManager.setRegionStateListener(quotaManager);
quotaManager.start(); quotaManager.start();
this.quotaManager = quotaManager; this.quotaManager = quotaManager;
} }
@ -1281,10 +1313,10 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
private void startProcedureExecutor() throws IOException { private void createProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore = new WALProcedureStore(conf, procedureStore =
new MasterProcedureEnv.WALStoreLeaseRecovery(this)); new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
@ -1297,10 +1329,17 @@ public class HMaster extends HRegionServer implements MasterServices {
conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads); procedureStore.start(numThreads);
procedureExecutor.start(numThreads, abortOnCorruption); // Just initialize it but do not start the workers, we will start the workers later by calling
// startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
// details.
procedureExecutor.init(numThreads, abortOnCorruption);
procEnv.getRemoteDispatcher().start(); procEnv.getRemoteDispatcher().start();
} }
private void startProcedureExecutor() throws IOException {
procedureExecutor.startWorkers();
}
private void stopProcedureExecutor() { private void stopProcedureExecutor() {
if (procedureExecutor != null) { if (procedureExecutor != null) {
configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
@ -2857,25 +2896,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return initialized; return initialized;
} }
/**
* ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
* of crashed servers.
* @return true if assignMeta has completed;
*/
@Override
public boolean isServerCrashProcessingEnabled() {
return serverCrashProcessingEnabled.isReady();
}
@VisibleForTesting
public void setServerCrashProcessingEnabled(final boolean b) {
procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
}
public ProcedureEvent<?> getServerCrashProcessingEnabledEvent() {
return serverCrashProcessingEnabled;
}
/** /**
* Compute the average load across all region servers. * Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of * Currently, this uses a very naive computation - just uses the number of
@ -3623,18 +3643,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return lockManager; return lockManager;
} }
@Override
public boolean recoverMeta() throws IOException {
// we need to block here so the latch should be greater than the current version to make sure
// that we will block.
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(Integer.MAX_VALUE, 0);
procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
latch.await();
LOG.info("hbase:meta deployed at={}",
getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));
return assignmentManager.isMetaInitialized();
}
public QuotaObserverChore getQuotaObserverChore() { public QuotaObserverChore getQuotaObserverChore() {
return this.quotaObserverChore; return this.quotaObserverChore;
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
* Used by the HMaster on startup to split meta logs and assign the meta table. * Used by the HMaster on startup to split meta logs and assign the meta table.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class MasterMetaBootstrap { class MasterMetaBootstrap {
private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class); private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class);
private final HMaster master; private final HMaster master;
@ -48,35 +47,12 @@ public class MasterMetaBootstrap {
this.master = master; this.master = master;
} }
public void recoverMeta() throws InterruptedException, IOException {
// This is a blocking call that waits until hbase:meta is deployed.
master.recoverMeta();
// Now we can start the TableStateManager. It is backed by hbase:meta.
master.getTableStateManager().start();
// Enable server crash procedure handling
enableCrashedServerProcessing();
}
public void processDeadServers() {
// get a list for previously failed RS which need log splitting work
// we recover hbase:meta region servers inside master initialization and
// handle other failed servers in SSH in order to start up master node ASAP
Set<ServerName> previouslyFailedServers =
master.getMasterWalManager().getFailedServersFromLogFolders();
// Master has recovered hbase:meta region server and we put
// other failed region servers in a queue to be handled later by SSH
for (ServerName tmpServer : previouslyFailedServers) {
master.getServerManager().processDeadServer(tmpServer, true);
}
}
/** /**
* For assigning hbase:meta replicas only. * For assigning hbase:meta replicas only.
* TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same * TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same
* server as the hbase:meta region. * server as the hbase:meta region.
*/ */
protected void assignMetaReplicas() void assignMetaReplicas()
throws IOException, InterruptedException, KeeperException { throws IOException, InterruptedException, KeeperException {
int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM, int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM); HConstants.DEFAULT_META_REPLICA_NUM);
@ -85,7 +61,7 @@ public class MasterMetaBootstrap {
return; return;
} }
final AssignmentManager assignmentManager = master.getAssignmentManager(); final AssignmentManager assignmentManager = master.getAssignmentManager();
if (!assignmentManager.isMetaInitialized()) { if (!assignmentManager.isMetaLoaded()) {
throw new IllegalStateException("hbase:meta must be initialized first before we can " + throw new IllegalStateException("hbase:meta must be initialized first before we can " +
"assign out its replicas"); "assign out its replicas");
} }
@ -137,15 +113,4 @@ public class MasterMetaBootstrap {
LOG.warn("Ignoring exception " + ex); LOG.warn("Ignoring exception " + ex);
} }
} }
private void enableCrashedServerProcessing() throws InterruptedException {
// If crashed server processing is disabled, we enable it and expire those dead but not expired
// servers. This is required so that if meta is assigning to a server which dies after
// assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
// stuck here waiting forever if waitForMeta is specified.
if (!master.isServerCrashProcessingEnabled()) {
master.setServerCrashProcessingEnabled(true);
master.getServerManager().processQueuedDeadServers();
}
}
} }

View File

@ -320,11 +320,6 @@ public interface MasterServices extends Server {
*/ */
TableDescriptors getTableDescriptors(); TableDescriptors getTableDescriptors();
/**
* @return true if master enables ServerShutdownHandler;
*/
boolean isServerCrashProcessingEnabled();
/** /**
* Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint. * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint.
* *
@ -494,13 +489,6 @@ public interface MasterServices extends Server {
*/ */
public void checkIfShouldMoveSystemRegionAsync(); public void checkIfShouldMoveSystemRegionAsync();
/**
* Recover meta table. Will result in no-op is meta is already initialized. Any code that has
* access to master and requires to access meta during process initialization can call this
* method to make sure meta is initialized.
*/
boolean recoverMeta() throws IOException;
String getClientIdAuditPrefix(); String getClientIdAuditPrefix();
/** /**

View File

@ -26,7 +26,8 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -142,10 +143,34 @@ public class MasterWalManager {
return this.fsOk; return this.fsOk;
} }
public Set<ServerName> getLiveServersFromWALDir() throws IOException {
Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
FileStatus[] walDirForLiveServers = FSUtils.listStatus(fs, walDirPath,
p -> !p.getName().endsWith(AbstractFSWALProvider.SPLITTING_EXT));
if (walDirForLiveServers == null) {
return Collections.emptySet();
}
return Stream.of(walDirForLiveServers).map(s -> {
ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(s.getPath());
if (serverName == null) {
LOG.warn("Log folder {} doesn't look like its name includes a " +
"region server name; leaving in place. If you see later errors about missing " +
"write ahead logs they may be saved in this location.", s.getPath());
return null;
}
return serverName;
}).filter(s -> s != null).collect(Collectors.toSet());
}
/** /**
* Inspect the log directory to find dead servers which need recovery work * Inspect the log directory to find dead servers which need recovery work
* @return A set of ServerNames which aren't running but still have WAL files left in file system * @return A set of ServerNames which aren't running but still have WAL files left in file system
* @deprecated With proc-v2, we can record the crash server with procedure store, so do not need
* to scan the wal directory to find out the splitting wal directory any more. Leave
* it here only because {@code RecoverMetaProcedure}(which is also deprecated) uses
* it.
*/ */
@Deprecated
public Set<ServerName> getFailedServersFromLogFolders() { public Set<ServerName> getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
@ -240,6 +265,7 @@ public class MasterWalManager {
boolean needReleaseLock = false; boolean needReleaseLock = false;
if (!this.services.isInitialized()) { if (!this.services.isInitialized()) {
// during master initialization, we could have multiple places splitting a same wal // during master initialization, we could have multiple places splitting a same wal
// XXX: Does this still exist after we move to proc-v2?
this.splitLogLock.lock(); this.splitLogLock.lock();
needReleaseLock = true; needReleaseLock = true;
} }

View File

@ -46,19 +46,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
/** /**
* <p>
* Tracks the online region servers via ZK. * Tracks the online region servers via ZK.
* </p> * <p/>
* <p>
* Handling of new RSs checking in is done via RPC. This class is only responsible for watching for * Handling of new RSs checking in is done via RPC. This class is only responsible for watching for
* expired nodes. It handles listening for changes in the RS node list. The only exception is when * expired nodes. It handles listening for changes in the RS node list. The only exception is when
* master restart, we will use the list fetched from zk to construct the initial set of live region * master restart, we will use the list fetched from zk to construct the initial set of live region
* servers. * servers.
* </p> * <p/>
* <p>
* If an RS node gets deleted, this automatically handles calling of * If an RS node gets deleted, this automatically handles calling of
* {@link ServerManager#expireServer(ServerName)} * {@link ServerManager#expireServer(ServerName)}
* </p>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RegionServerTracker extends ZKListener { public class RegionServerTracker extends ZKListener {
@ -76,7 +72,7 @@ public class RegionServerTracker extends ZKListener {
super(watcher); super(watcher);
this.server = server; this.server = server;
this.serverManager = serverManager; this.serverManager = serverManager;
executor = Executors.newSingleThreadExecutor( this.executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
} }
@ -109,14 +105,19 @@ public class RegionServerTracker extends ZKListener {
} }
/** /**
* <p> * Starts the tracking of online RegionServers. All RSes will be tracked after this method is
* Starts the tracking of online RegionServers. * called.
* </p> * <p/>
* <p> * In this method, we will also construct the region server sets in {@link ServerManager}. If a
* All RSs will be tracked after this method is called. * region server is dead between the crash of the previous master instance and the start of the
* </p> * current master instance, we will schedule a SCP for it. This is done in
* {@link ServerManager#findOutDeadServersAndProcess(Set, Set)}, we call it here under the lock
* protection to prevent concurrency issues with server expiration operation.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
*/ */
public void start() throws KeeperException, IOException { public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir)
throws KeeperException, IOException {
watcher.registerListener(this); watcher.registerListener(this);
synchronized (this) { synchronized (this) {
List<String> servers = List<String> servers =
@ -132,6 +133,7 @@ public class RegionServerTracker extends ZKListener {
: ServerMetricsBuilder.of(serverName); : ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics); serverManager.checkAndRecordNewServer(serverName, serverMetrics);
} }
serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
} }
} }

View File

@ -25,13 +25,11 @@ import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -180,41 +178,6 @@ public class ServerManager {
private final RpcControllerFactory rpcControllerFactory; private final RpcControllerFactory rpcControllerFactory;
/**
* Set of region servers which are dead but not processed immediately. If one
* server died before master enables ServerShutdownHandler, the server will be
* added to this set and will be processed through calling
* {@link ServerManager#processQueuedDeadServers()} by master.
* <p>
* A dead server is a server instance known to be dead, not listed in the /hbase/rs
* znode any more. It may have not been submitted to ServerShutdownHandler yet
* because the handler is not enabled.
* <p>
* A dead server, which has been submitted to ServerShutdownHandler while the
* handler is not enabled, is queued up.
* <p>
* So this is a set of region servers known to be dead but not submitted to
* ServerShutdownHandler for processing yet.
*/
private Set<ServerName> queuedDeadServers = new HashSet<>();
/**
* Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
* fully processed immediately.
* <p>
* If one server died before assignment manager finished the failover cleanup, the server will be
* added to this set and will be processed through calling
* {@link ServerManager#processQueuedDeadServers()} by assignment manager.
* <p>
* The Boolean value indicates whether log split is needed inside ServerShutdownHandler
* <p>
* ServerShutdownHandler processes a dead server submitted to the handler after the handler is
* enabled. It may not be able to complete the processing because meta is not yet online or master
* is currently in startup mode. In this case, the dead server will be parked in this set
* temporarily.
*/
private Map<ServerName, Boolean> requeuedDeadServers = new ConcurrentHashMap<>();
/** Listeners that are called on server events. */ /** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>(); private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
@ -377,6 +340,26 @@ public class ServerManager {
return true; return true;
} }
/**
* Find out the region servers crashed between the crash of the previous master instance and the
* current master instance and schedule SCP for them.
* <p/>
* Since the {@code RegionServerTracker} has already helped us to construct the online servers set
* by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
* to find out whether there are servers which are already dead.
* <p/>
* Must be called inside the initialization method of {@code RegionServerTracker} to avoid
* concurrency issue.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
*/
void findOutDeadServersAndProcess(Set<ServerName> deadServersFromPE,
Set<ServerName> liveServersFromWALDir) {
deadServersFromPE.forEach(deadservers::add);
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
.forEach(this::expireServer);
}
/** /**
* Checks if the clock skew between the server and the master. If the clock skew exceeds the * Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold, * configured max, it will throw an exception; if it exceeds the configured warning threshold,
@ -386,7 +369,7 @@ public class ServerManager {
* @throws ClockOutOfSyncException if the skew exceeds the configured max value * @throws ClockOutOfSyncException if the skew exceeds the configured max value
*/ */
private void checkClockSkew(final ServerName serverName, final long serverCurrentTime) private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
throws ClockOutOfSyncException { throws ClockOutOfSyncException {
long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime); long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
if (skew > maxSkew) { if (skew > maxSkew) {
String message = "Server " + serverName + " has been " + String message = "Server " + serverName + " has been " +
@ -406,9 +389,7 @@ public class ServerManager {
* If this server is on the dead list, reject it with a YouAreDeadException. * If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry * If it was dead but came back with a new start code, remove the old entry
* from the dead list. * from the dead list.
* @param serverName
* @param what START or REPORT * @param what START or REPORT
* @throws org.apache.hadoop.hbase.YouAreDeadException
*/ */
private void checkIsDead(final ServerName serverName, final String what) private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException { throws YouAreDeadException {
@ -589,13 +570,12 @@ public class ServerManager {
return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
} }
/* /**
* Expire the passed server. Add it to list of dead servers and queue a * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
* shutdown processing. * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen * many reasons including the fact that its this server that is going down or we already
* for many reasons including the fact that its this server that is going down or we already * have queued an SCP for this server or SCP processing is currently disabled because we
* have queued an SCP for this server or SCP processing is currently disabled because we are * are in startup phase).
* in startup phase).
*/ */
public synchronized boolean expireServer(final ServerName serverName) { public synchronized boolean expireServer(final ServerName serverName) {
// THIS server is going down... can't handle our own expiration. // THIS server is going down... can't handle our own expiration.
@ -605,18 +585,6 @@ public class ServerManager {
} }
return false; return false;
} }
// No SCP handling during startup.
if (!master.isServerCrashProcessingEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName);
// Even though we delay expire of this server, we still need to handle Meta's RIT
// that are against the crashed server; since when we do RecoverMetaProcedure,
// the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287
master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
this.queuedDeadServers.add(serverName);
// Return true because though on SCP queued, there will be one queued later.
return true;
}
if (this.deadservers.isDeadServer(serverName)) { if (this.deadservers.isDeadServer(serverName)) {
LOG.warn("Expiration called on {} but crash processing already in progress", serverName); LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false; return false;
@ -665,52 +633,6 @@ public class ServerManager {
this.rsAdmins.remove(sn); this.rsAdmins.remove(sn);
} }
public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
// When assignment manager is cleaning up the zookeeper nodes and rebuilding the
// in-memory region states, region servers could be down. Meta table can and
// should be re-assigned, log splitting can be done too. However, it is better to
// wait till the cleanup is done before re-assigning user regions.
//
// We should not wait in the server shutdown handler thread since it can clog
// the handler threads and meta table could not be re-assigned in case
// the corresponding server is down. So we queue them up here instead.
if (!master.getAssignmentManager().isFailoverCleanupDone()) {
requeuedDeadServers.put(serverName, shouldSplitWal);
return;
}
this.deadservers.add(serverName);
master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal);
}
/**
* Process the servers which died during master's initialization. It will be
* called after HMaster#assignMeta and AssignmentManager#joinCluster.
* */
synchronized void processQueuedDeadServers() {
if (!master.isServerCrashProcessingEnabled()) {
LOG.info("Master hasn't enabled ServerShutdownHandler");
}
Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
while (serverIterator.hasNext()) {
ServerName tmpServerName = serverIterator.next();
expireServer(tmpServerName);
serverIterator.remove();
requeuedDeadServers.remove(tmpServerName);
}
if (!master.getAssignmentManager().isFailoverCleanupDone()) {
if (LOG.isTraceEnabled()) {
LOG.trace("AssignmentManager failover cleanup not done.");
}
}
for (Map.Entry<ServerName, Boolean> entry : requeuedDeadServers.entrySet()) {
processDeadServer(entry.getKey(), entry.getValue());
}
requeuedDeadServers.clear();
}
/* /*
* Remove the server from the drain list. * Remove the server from the drain list.
*/ */
@ -975,13 +897,6 @@ public class ServerManager {
return new ArrayList<>(this.drainingServers); return new ArrayList<>(this.drainingServers);
} }
/**
* @return A copy of the internal set of deadNotExpired servers.
*/
Set<ServerName> getDeadNotExpiredServers() {
return new HashSet<>(this.queuedDeadServers);
}
public boolean isServerOnline(ServerName serverName) { public boolean isServerOnline(ServerName serverName) {
return serverName != null && onlineServers.containsKey(serverName); return serverName != null && onlineServers.containsKey(serverName);
} }
@ -993,9 +908,7 @@ public class ServerManager {
* master any more, for example, a very old previous instance). * master any more, for example, a very old previous instance).
*/ */
public synchronized boolean isServerDead(ServerName serverName) { public synchronized boolean isServerDead(ServerName serverName) {
return serverName == null || deadservers.isDeadServer(serverName) return serverName == null || deadservers.isDeadServer(serverName);
|| queuedDeadServers.contains(serverName)
|| requeuedDeadServers.containsKey(serverName);
} }
public void shutdownCluster() { public void shutdownCluster() {
@ -1061,8 +974,6 @@ public class ServerManager {
final List<ServerName> drainingServersCopy = getDrainingServersList(); final List<ServerName> drainingServersCopy = getDrainingServersList();
destServers.removeAll(drainingServersCopy); destServers.removeAll(drainingServersCopy);
// Remove the deadNotExpired servers from the server list.
removeDeadNotExpiredServers(destServers);
return destServers; return destServers;
} }
@ -1073,23 +984,6 @@ public class ServerManager {
return createDestinationServersList(null); return createDestinationServersList(null);
} }
/**
* Loop through the deadNotExpired server list and remove them from the
* servers.
* This function should be used carefully outside of this class. You should use a high level
* method such as {@link #createDestinationServersList()} instead of managing you own list.
*/
void removeDeadNotExpiredServers(List<ServerName> servers) {
Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
if (!deadNotExpiredServersCopy.isEmpty()) {
for (ServerName server : deadNotExpiredServersCopy) {
LOG.debug("Removing dead but not expired server: " + server
+ " from eligible server pool.");
servers.remove(server);
}
}
}
/** /**
* To clear any dead server with same host name and port of any online server * To clear any dead server with same host name and port of any online server
*/ */
@ -1259,7 +1153,6 @@ public class ServerManager {
} }
} }
private class FlushedSequenceIdFlusher extends ScheduledChore { private class FlushedSequenceIdFlusher extends ScheduledChore {
public FlushedSequenceIdFlusher(String name, int p) { public FlushedSequenceIdFlusher(String name, int p) {

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master.assignment; package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException; import java.io.IOException;
@ -24,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -40,7 +38,6 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionException; import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionStateListener;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.YouAreDeadException;
@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState; import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
@ -84,7 +80,10 @@ import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -147,26 +146,13 @@ public class AssignmentManager implements ServerListener {
"hbase.metrics.rit.stuck.warning.threshold"; "hbase.metrics.rit.stuck.warning.threshold";
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
private final ProcedureEvent<?> metaInitializedEvent = new ProcedureEvent<>("meta initialized"); private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
/**
* Indicator that AssignmentManager has recovered the region states so
* that ServerCrashProcedure can be fully enabled and re-assign regions
* of dead servers. So that when re-assignment happens, AssignmentManager
* has proper region states.
*/
private final ProcedureEvent<?> failoverCleanupDone = new ProcedureEvent<>("failover cleanup");
/** Listeners that are called on assignment events. */ /** Listeners that are called on assignment events. */
private final CopyOnWriteArrayList<AssignmentListener> listeners = private final CopyOnWriteArrayList<AssignmentListener> listeners =
new CopyOnWriteArrayList<AssignmentListener>(); new CopyOnWriteArrayList<AssignmentListener>();
// TODO: why is this different from the listeners (carried over from the old AM)
private RegionStateListener regionStateListener;
private RegionNormalizer regionNormalizer;
private final MetricsAssignmentManager metrics; private final MetricsAssignmentManager metrics;
private final RegionInTransitionChore ritChore; private final RegionInTransitionChore ritChore;
private final MasterServices master; private final MasterServices master;
@ -210,12 +196,9 @@ public class AssignmentManager implements ServerListener {
int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_RIT_CHORE_INTERVAL_MSEC); DEFAULT_RIT_CHORE_INTERVAL_MSEC);
this.ritChore = new RegionInTransitionChore(ritChoreInterval); this.ritChore = new RegionInTransitionChore(ritChoreInterval);
// Used for region related procedure.
setRegionNormalizer(master.getRegionNormalizer());
} }
public void start() throws IOException { public void start() throws IOException, KeeperException {
if (!running.compareAndSet(false, true)) { if (!running.compareAndSet(false, true)) {
return; return;
} }
@ -227,6 +210,20 @@ public class AssignmentManager implements ServerListener {
// Start the Assignment Thread // Start the Assignment Thread
startAssignmentThread(); startAssignmentThread();
// load meta region state
ZKWatcher zkw = master.getZooKeeper();
// it could be null in some tests
if (zkw != null) {
RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
RegionStateNode regionStateNode =
regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
synchronized (regionStateNode) {
regionStateNode.setRegionLocation(regionState.getServerName());
regionStateNode.setState(regionState.getState());
setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
}
}
} }
public void stop() { public void stop() {
@ -257,9 +254,8 @@ public class AssignmentManager implements ServerListener {
// Update meta events (for testing) // Update meta events (for testing)
if (hasProcExecutor) { if (hasProcExecutor) {
metaLoadEvent.suspend(); metaLoadEvent.suspend();
setFailoverCleanupDone(false);
for (RegionInfo hri: getMetaRegionSet()) { for (RegionInfo hri: getMetaRegionSet()) {
setMetaInitialized(hri, false); setMetaAssigned(hri, false);
} }
} }
} }
@ -288,7 +284,7 @@ public class AssignmentManager implements ServerListener {
return getProcedureEnvironment().getProcedureScheduler(); return getProcedureEnvironment().getProcedureScheduler();
} }
protected int getAssignMaxAttempts() { int getAssignMaxAttempts() {
return assignMaxAttempts; return assignMaxAttempts;
} }
@ -308,18 +304,6 @@ public class AssignmentManager implements ServerListener {
return this.listeners.remove(listener); return this.listeners.remove(listener);
} }
public void setRegionStateListener(final RegionStateListener listener) {
this.regionStateListener = listener;
}
public void setRegionNormalizer(final RegionNormalizer normalizer) {
this.regionNormalizer = normalizer;
}
public RegionNormalizer getRegionNormalizer() {
return regionNormalizer;
}
public RegionStates getRegionStates() { public RegionStates getRegionStates() {
return regionStates; return regionStates;
} }
@ -371,12 +355,8 @@ public class AssignmentManager implements ServerListener {
} }
public boolean isCarryingMeta(final ServerName serverName) { public boolean isCarryingMeta(final ServerName serverName) {
for (RegionInfo hri: getMetaRegionSet()) { // TODO: handle multiple meta
if (isCarryingRegion(serverName, hri)) { return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
return true;
}
}
return false;
} }
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
@ -402,49 +382,66 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================ // ============================================================================================
// META Event(s) helpers // META Event(s) helpers
// ============================================================================================ // ============================================================================================
public boolean isMetaInitialized() { /**
return metaInitializedEvent.isReady(); * Notice that, this only means the meta region is available on a RS, but the AM may still be
* loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
* before checking this method, unless you can make sure that your piece of code can only be
* executed after AM builds the region states.
* @see #isMetaLoaded()
*/
public boolean isMetaAssigned() {
return metaAssignEvent.isReady();
} }
public boolean isMetaRegionInTransition() { public boolean isMetaRegionInTransition() {
return !isMetaInitialized(); return !isMetaAssigned();
} }
public boolean waitMetaInitialized(final Procedure proc) { /**
// TODO: handle multiple meta. should this wait on all meta? * Notice that this event does not mean the AM has already finished region state rebuilding. See
// this is used by the ServerCrashProcedure... * the comment of {@link #isMetaAssigned()} for more details.
return waitMetaInitialized(proc, RegionInfoBuilder.FIRST_META_REGIONINFO); * @see #isMetaAssigned()
*/
public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
} }
public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) { private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
}
private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo); ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
if (isInitialized) { if (assigned) {
metaInitEvent.wake(getProcedureScheduler()); metaAssignEvent.wake(getProcedureScheduler());
} else { } else {
metaInitEvent.suspend(); metaAssignEvent.suspend();
} }
} }
private ProcedureEvent getMetaInitializedEvent(final RegionInfo metaRegionInfo) { private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
// TODO: handle multiple meta. // TODO: handle multiple meta.
return metaInitializedEvent; return metaAssignEvent;
} }
public boolean waitMetaLoaded(final Procedure proc) { /**
* Wait until AM finishes the meta loading, i.e, the region states rebuilding.
* @see #isMetaLoaded()
* @see #waitMetaAssigned(Procedure, RegionInfo)
*/
public boolean waitMetaLoaded(Procedure<?> proc) {
return metaLoadEvent.suspendIfNotReady(proc); return metaLoadEvent.suspendIfNotReady(proc);
} }
protected void wakeMetaLoadedEvent() { @VisibleForTesting
void wakeMetaLoadedEvent() {
metaLoadEvent.wake(getProcedureScheduler()); metaLoadEvent.wake(getProcedureScheduler());
assert isMetaLoaded() : "expected meta to be loaded"; assert isMetaLoaded() : "expected meta to be loaded";
} }
/**
* Return whether AM finishes the meta loading, i.e, the region states rebuilding.
* @see #isMetaAssigned()
* @see #waitMetaLoaded(Procedure)
*/
public boolean isMetaLoaded() { public boolean isMetaLoaded() {
return metaLoadEvent.isReady(); return metaLoadEvent.isReady();
} }
@ -849,7 +846,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionTransition(final ServerName serverName, final TransitionCode state, private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo regionInfo, final long seqId) final RegionInfo regionInfo, final long seqId)
throws PleaseHoldException, UnexpectedStateException { throws PleaseHoldException, UnexpectedStateException {
checkFailoverCleanupCompleted(regionInfo); checkMetaLoaded(regionInfo);
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) { if (regionNode == null) {
@ -890,7 +887,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
throws IOException { throws IOException {
checkFailoverCleanupCompleted(parent); checkMetaLoaded(parent);
if (state != TransitionCode.READY_TO_SPLIT) { if (state != TransitionCode.READY_TO_SPLIT) {
throw new UnexpectedStateException("unsupported split regionState=" + state + throw new UnexpectedStateException("unsupported split regionState=" + state +
@ -922,7 +919,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
checkFailoverCleanupCompleted(merged); checkMetaLoaded(merged);
if (state != TransitionCode.READY_TO_MERGE) { if (state != TransitionCode.READY_TO_MERGE) {
throw new UnexpectedStateException("Unsupported merge regionState=" + state + throw new UnexpectedStateException("Unsupported merge regionState=" + state +
@ -1063,7 +1060,7 @@ public class AssignmentManager implements ServerListener {
} }
} }
protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) { protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
if (serverNode == null) { if (serverNode == null) {
LOG.warn("serverName=null; {}", proc); LOG.warn("serverName=null; {}", proc);
@ -1152,7 +1149,7 @@ public class AssignmentManager implements ServerListener {
public Collection<RegionState> getRegionOverThreshold() { public Collection<RegionState> getRegionOverThreshold() {
Map<String, RegionState> m = this.ritsOverThreshold; Map<String, RegionState> m = this.ritsOverThreshold;
return m != null? m.values(): Collections.EMPTY_SET; return m != null? m.values(): Collections.emptySet();
} }
public boolean isRegionOverThreshold(final RegionInfo regionInfo) { public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
@ -1209,27 +1206,44 @@ public class AssignmentManager implements ServerListener {
// TODO: Master load/bootstrap // TODO: Master load/bootstrap
// ============================================================================================ // ============================================================================================
public void joinCluster() throws IOException { public void joinCluster() throws IOException {
final long startTime = System.currentTimeMillis(); long startTime = System.nanoTime();
LOG.debug("Joining cluster..."); LOG.debug("Joining cluster...");
// Scan hbase:meta to build list of existing regions, servers, and assignment // Scan hbase:meta to build list of existing regions, servers, and assignment
// hbase:meta is online when we get to here and TableStateManager has been started. // hbase:meta is online when we get to here and TableStateManager has been started.
loadMeta(); loadMeta();
for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) { while (master.getServerManager().countOfRegionServers() < 1) {
LOG.info("Waiting for RegionServers to join; current count=" + LOG.info("Waiting for RegionServers to join; current count={}",
master.getServerManager().countOfRegionServers()); master.getServerManager().countOfRegionServers());
Threads.sleep(250); Threads.sleep(250);
} }
LOG.info("Number of RegionServers=" + master.getServerManager().countOfRegionServers()); LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
boolean failover = processofflineServersWithOnlineRegions(); processOfflineRegions();
// Start the RIT chore // Start the RIT chore
master.getMasterProcedureExecutor().addChore(this.ritChore); master.getMasterProcedureExecutor().addChore(this.ritChore);
LOG.info(String.format("Joined the cluster in %s, failover=%s", long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
StringUtils.humanTimeDiff(System.currentTimeMillis() - startTime), failover)); LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
}
// Create assign procedure for offline regions.
// Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
// deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
// And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
// OFFLINE state, and usually there will be a procedure to track them. The
// processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
// different now, maybe we do not need this method any more. Need to revisit later.
private void processOfflineRegions() {
List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
.filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
.map(RegionState::getRegion).collect(Collectors.toList());
if (!offlineRegions.isEmpty()) {
master.getMasterProcedureExecutor().submitProcedures(
master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
}
} }
private void loadMeta() throws IOException { private void loadMeta() throws IOException {
@ -1286,117 +1300,21 @@ public class AssignmentManager implements ServerListener {
} }
/** /**
* Look at what is in meta and the list of servers that have checked in and make reconciliation. * Used to check if the meta loading is done.
* We cannot tell definitively the difference between a clean shutdown and a cluster that has * <p/>
* been crashed down. At this stage of a Master startup, they look the same: they have the
* same state in hbase:meta. We could do detective work probing ZK and the FS for old WALs to
* split but SCP does this already so just let it do its job.
* <p>>The profiles of clean shutdown and cluster crash-down are the same because on clean
* shutdown currently, we do not update hbase:meta with region close state (In AMv2, region
* state is kept in hbse:meta). Usually the master runs all region transitions as of AMv2 but on
* cluster controlled shutdown, the RegionServers close all their regions only reporting the
* final change to the Master. Currently this report is ignored. Later we could take it and
* update as many regions as we can before hbase:meta goes down or have the master run the
* close of all regions out on the cluster but we may never be able to achieve the proper state on
* all regions (at least not w/o lots of painful manipulations and waiting) so clean shutdown
* might not be possible especially on big clusters.... And clean shutdown will take time. Given
* this current state of affairs, we just run ServerCrashProcedure in both cases. It will always
* do the right thing.
* @return True if for sure this is a failover where a Master is starting up into an already
* running cluster.
*/
// The assumption here is that if RSs are crashing while we are executing this
// they will be handled by the SSH that are put in the ServerManager deadservers "queue".
private boolean processofflineServersWithOnlineRegions() {
boolean deadServers = !master.getServerManager().getDeadServers().isEmpty();
final Set<ServerName> offlineServersWithOnlineRegions = new HashSet<>();
int size = regionStates.getRegionStateNodes().size();
final List<RegionInfo> offlineRegionsToAssign = new ArrayList<>(size);
// If deadservers then its a failover, else, we are not sure yet.
boolean failover = deadServers;
for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
// Region State can be OPEN even if we did controlled cluster shutdown; Master does not close
// the regions in this case. The RegionServer does the close so hbase:meta is state in
// hbase:meta is not updated -- Master does all updates -- and is left with OPEN as region
// state in meta. How to tell difference between ordered shutdown and crashed-down cluster
// then? We can't. Not currently. Perhaps if we updated hbase:meta with CLOSED on ordered
// shutdown. This would slow shutdown though and not all edits would make it in anyways.
// TODO: Examine.
// Because we can't be sure it an ordered shutdown, we run ServerCrashProcedure always.
// ServerCrashProcedure will try to retain old deploy when it goes to assign.
if (regionNode.getState() == State.OPEN) {
final ServerName serverName = regionNode.getRegionLocation();
if (!master.getServerManager().isServerOnline(serverName)) {
offlineServersWithOnlineRegions.add(serverName);
} else {
// Server is online. This a failover. Master is starting into already-running cluster.
failover = true;
}
} else if (regionNode.getState() == State.OFFLINE) {
if (isTableEnabled(regionNode.getTable())) {
offlineRegionsToAssign.add(regionNode.getRegionInfo());
}
}
}
// Kill servers with online regions just-in-case. Runs ServerCrashProcedure.
for (ServerName serverName: offlineServersWithOnlineRegions) {
if (!master.getServerManager().isServerOnline(serverName)) {
LOG.info("KILL RegionServer=" + serverName + " hosting regions but not online.");
killRegionServer(serverName);
}
}
setFailoverCleanupDone(true);
// Assign offline regions. Uses round-robin.
if (offlineRegionsToAssign.size() > 0) {
master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager().
createRoundRobinAssignProcedures(offlineRegionsToAssign));
}
return failover;
}
/**
* Used by ServerCrashProcedure to make sure AssignmentManager has completed
* the failover cleanup before re-assigning regions of dead servers. So that
* when re-assignment happens, AssignmentManager has proper region states.
*/
public boolean isFailoverCleanupDone() {
return failoverCleanupDone.isReady();
}
/**
* Used by ServerCrashProcedure tests verify the ability to suspend the
* execution of the ServerCrashProcedure.
*/
@VisibleForTesting
public void setFailoverCleanupDone(final boolean b) {
master.getMasterProcedureExecutor().getEnvironment()
.setEventReady(failoverCleanupDone, b);
}
public ProcedureEvent getFailoverCleanupEvent() {
return failoverCleanupDone;
}
/**
* Used to check if the failover cleanup is done.
* if not we throw PleaseHoldException since we are rebuilding the RegionStates * if not we throw PleaseHoldException since we are rebuilding the RegionStates
* @param hri region to check if it is already rebuild * @param hri region to check if it is already rebuild
* @throws PleaseHoldException if the failover cleanup is not completed * @throws PleaseHoldException if meta has not been loaded yet
*/ */
private void checkFailoverCleanupCompleted(final RegionInfo hri) throws PleaseHoldException { private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
if (!isRunning()) { if (!isRunning()) {
throw new PleaseHoldException("AssignmentManager not running"); throw new PleaseHoldException("AssignmentManager not running");
} }
// TODO: can we avoid throwing an exception if hri is already loaded?
// at the moment we bypass only meta
boolean meta = isMetaRegion(hri); boolean meta = isMetaRegion(hri);
boolean cleanup = isFailoverCleanupDone(); boolean metaLoaded = isMetaLoaded();
if (!isMetaRegion(hri) && !isFailoverCleanupDone()) { if (!meta && !metaLoaded) {
String msg = "Master not fully online; hbase:meta=" + meta + ", failoverCleanup=" + cleanup; throw new PleaseHoldException(
throw new PleaseHoldException(msg); "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
} }
} }
@ -1539,7 +1457,7 @@ public class AssignmentManager implements ServerListener {
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state. // on table that contains state.
setMetaInitialized(hri, true); setMetaAssigned(hri, true);
} }
regionStates.addRegionToServer(regionNode); regionStates.addRegionToServer(regionNode);
// TODO: OPENING Updates hbase:meta too... we need to do both here and there? // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
@ -1555,7 +1473,7 @@ public class AssignmentManager implements ServerListener {
regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
// Set meta has not initialized early. so people trying to create/edit tables will wait // Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) { if (isMetaRegion(hri)) {
setMetaInitialized(hri, false); setMetaAssigned(hri, false);
} }
regionStates.addRegionToServer(regionNode); regionStates.addRegionToServer(regionNode);
regionStateStore.updateRegionLocation(regionNode); regionStateStore.updateRegionLocation(regionNode);
@ -1831,7 +1749,7 @@ public class AssignmentManager implements ServerListener {
private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
final long st = System.currentTimeMillis(); final long st = System.currentTimeMillis();
if (plan == null) { if (plan == null) {
@ -1883,7 +1801,7 @@ public class AssignmentManager implements ServerListener {
.map((s)->new Pair<>(s, master.getRegionServerVersion(s))) .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (serverList.isEmpty()) { if (serverList.isEmpty()) {
return Collections.EMPTY_LIST; return Collections.emptyList();
} }
String highestVersion = Collections.max(serverList, String highestVersion = Collections.max(serverList,
(o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
@ -1909,11 +1827,6 @@ public class AssignmentManager implements ServerListener {
wakeServerReportEvent(serverNode); wakeServerReportEvent(serverNode);
} }
private void killRegionServer(final ServerName serverName) {
final ServerStateNode serverNode = regionStates.getServerNode(serverName);
killRegionServer(serverNode);
}
private void killRegionServer(final ServerStateNode serverNode) { private void killRegionServer(final ServerStateNode serverNode) {
master.getServerManager().expireServer(serverNode.getServerName()); master.getServerManager().expireServer(serverNode.getServerName());
} }

View File

@ -553,7 +553,7 @@ public class MergeTableRegionsProcedure
try { try {
env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion); env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
} catch (QuotaExceededException e) { } catch (QuotaExceededException e) {
env.getAssignmentManager().getRegionNormalizer().planSkipped(this.mergedRegion, env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
NormalizationPlan.PlanType.MERGE); NormalizationPlan.PlanType.MERGE);
throw e; throw e;
} }

View File

@ -128,23 +128,24 @@ public class RegionStateStore {
public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode) public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode)
throws IOException { throws IOException {
if (regionStateNode.getRegionInfo().isMetaRegion()) { if (regionStateNode.getRegionInfo().isMetaRegion()) {
updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation(),
regionStateNode.getState());
} else { } else {
long openSeqNum = regionStateNode.getState() == State.OPEN ? long openSeqNum = regionStateNode.getState() == State.OPEN ? regionStateNode.getOpenSeqNum()
regionStateNode.getOpenSeqNum() : HConstants.NO_SEQNUM; : HConstants.NO_SEQNUM;
updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(), updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(),
regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum, regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum,
// The regionStateNode may have no procedure in a test scenario; allow for this. // The regionStateNode may have no procedure in a test scenario; allow for this.
regionStateNode.getProcedure() != null? regionStateNode.getProcedure() != null ? regionStateNode.getProcedure().getProcId()
regionStateNode.getProcedure().getProcId(): Procedure.NO_PROC_ID); : Procedure.NO_PROC_ID);
} }
} }
private void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName) private void updateMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
throws IOException { throws IOException {
try { try {
MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
regionInfo.getReplicaId(), State.OPEN); state);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException(e); throw new IOException(e);
} }

View File

@ -421,9 +421,11 @@ public abstract class RegionTransitionProcedure
@Override @Override
protected LockState acquireLock(final MasterProcedureEnv env) { protected LockState acquireLock(final MasterProcedureEnv env) {
// Unless we are assigning meta, wait for meta to be available and loaded. // Unless we are assigning meta, wait for meta to be available and loaded.
if (!isMeta() && (env.waitFailoverCleanup(this) || if (!isMeta()) {
env.getAssignmentManager().waitMetaInitialized(this, getRegionInfo()))) { AssignmentManager am = env.getAssignmentManager();
return LockState.LOCK_EVENT_WAIT; if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) {
return LockState.LOCK_EVENT_WAIT;
}
} }
// TODO: Revisit this and move it to the executor // TODO: Revisit this and move it to the executor
@ -432,8 +434,7 @@ public abstract class RegionTransitionProcedure
LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " + LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " +
env.getProcedureScheduler().dumpLocks()); env.getProcedureScheduler().dumpLocks());
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // ignore, just for logging
e.printStackTrace();
} }
return LockState.LOCK_EVENT_WAIT; return LockState.LOCK_EVENT_WAIT;
} }

View File

@ -498,7 +498,7 @@ public class SplitTableRegionProcedure
try { try {
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion()); env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
} catch (QuotaExceededException e) { } catch (QuotaExceededException e) {
env.getAssignmentManager().getRegionNormalizer().planSkipped(this.getParentRegion(), env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
NormalizationPlan.PlanType.SPLIT); NormalizationPlan.PlanType.SPLIT);
throw e; throw e;
} }

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
@ -31,7 +30,6 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionOfflineException; import org.apache.hadoop.hbase.client.RegionOfflineException;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
@ -92,7 +90,9 @@ public abstract class AbstractStateMachineTableProcedure<TState>
@Override @Override
protected LockState acquireLock(final MasterProcedureEnv env) { protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; if (env.waitInitialized(this)) {
return LockState.LOCK_EVENT_WAIT;
}
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT; return LockState.LOCK_EVENT_WAIT;
} }

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaStateData;
/**
* This procedure is used to initialize meta table for a new hbase deploy. It will just schedule an
* {@link AssignProcedure} to assign meta.
*/
@InterfaceAudience.Private
public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMetaState> {
private CountDownLatch latch = new CountDownLatch(1);
@Override
public TableName getTableName() {
return TableName.META_TABLE_NAME;
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.CREATE;
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, InitMetaState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case INIT_META_ASSIGN_META:
addChildProcedure(env.getAssignmentManager()
.createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO));
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
// we do not need to wait for master initialized, we are part of the initialization.
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override
protected void rollbackState(MasterProcedureEnv env, InitMetaState state)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected InitMetaState getState(int stateId) {
return InitMetaState.forNumber(stateId);
}
@Override
protected int getStateId(InitMetaState state) {
return state.getNumber();
}
@Override
protected InitMetaState getInitialState() {
return InitMetaState.INIT_META_ASSIGN_META;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(InitMetaStateData.getDefaultInstance());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
serializer.deserialize(InitMetaStateData.class);
}
@Override
protected void completionCleanup(MasterProcedureEnv env) {
latch.countDown();
}
public void await() throws InterruptedException {
latch.await();
}
}

View File

@ -19,13 +19,11 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@ -70,8 +68,7 @@ public class MasterProcedureEnv implements ConfigurationObserver {
} }
private boolean isRunning() { private boolean isRunning() {
return master.isActiveMaster() && !master.isStopped() && return !master.isStopped() && !master.isStopping() && !master.isAborted();
!master.isStopping() && !master.isAborted();
} }
} }
@ -155,17 +152,6 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getInitializedEvent().suspendIfNotReady(proc); return master.getInitializedEvent().suspendIfNotReady(proc);
} }
public boolean waitServerCrashProcessingEnabled(Procedure<?> proc) {
if (master instanceof HMaster) {
return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
}
return false;
}
public boolean waitFailoverCleanup(Procedure<?> proc) {
return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
}
public void setEventReady(ProcedureEvent<?> event, boolean isReady) { public void setEventReady(ProcedureEvent<?> event, boolean isReady) {
if (isReady) { if (isReady) {
event.wake(procSched); event.wake(procSched);

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
@ -480,7 +479,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param table Table to lock * @param table Table to lock
* @return true if the procedure has to wait for the table to be available * @return true if the procedure has to wait for the table to be available
*/ */
public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
schedLock(); schedLock();
try { try {
final String namespace = table.getNamespaceAsString(); final String namespace = table.getNamespaceAsString();
@ -509,7 +508,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock * @param procedure the procedure releasing the lock
* @param table the name of the table that has the exclusive lock * @param table the name of the table that has the exclusive lock
*/ */
public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
schedLock(); schedLock();
try { try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
@ -537,7 +536,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param table Table to lock * @param table Table to lock
* @return true if the procedure has to wait for the table to be available * @return true if the procedure has to wait for the table to be available
*/ */
public boolean waitTableSharedLock(final Procedure procedure, final TableName table) { public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
return waitTableQueueSharedLock(procedure, table) == null; return waitTableQueueSharedLock(procedure, table) == null;
} }
@ -568,7 +567,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock * @param procedure the procedure releasing the lock
* @param table the name of the table that has the shared lock * @param table the name of the table that has the shared lock
*/ */
public void wakeTableSharedLock(final Procedure procedure, final TableName table) { public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
schedLock(); schedLock();
try { try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
@ -629,7 +628,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param regionInfo the region we are trying to lock * @param regionInfo the region we are trying to lock
* @return true if the procedure has to wait for the regions to be available * @return true if the procedure has to wait for the regions to be available
*/ */
public boolean waitRegion(final Procedure procedure, final RegionInfo regionInfo) { public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
return waitRegions(procedure, regionInfo.getTable(), regionInfo); return waitRegions(procedure, regionInfo.getTable(), regionInfo);
} }
@ -640,7 +639,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param regionInfo the list of regions we are trying to lock * @param regionInfo the list of regions we are trying to lock
* @return true if the procedure has to wait for the regions to be available * @return true if the procedure has to wait for the regions to be available
*/ */
public boolean waitRegions(final Procedure procedure, final TableName table, public boolean waitRegions(final Procedure<?> procedure, final TableName table,
final RegionInfo... regionInfo) { final RegionInfo... regionInfo) {
Arrays.sort(regionInfo, RegionInfo.COMPARATOR); Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
schedLock(); schedLock();
@ -688,7 +687,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure that was holding the region * @param procedure the procedure that was holding the region
* @param regionInfo the region the procedure was holding * @param regionInfo the region the procedure was holding
*/ */
public void wakeRegion(final Procedure procedure, final RegionInfo regionInfo) { public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
wakeRegions(procedure, regionInfo.getTable(), regionInfo); wakeRegions(procedure, regionInfo.getTable(), regionInfo);
} }
@ -697,7 +696,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure that was holding the regions * @param procedure the procedure that was holding the regions
* @param regionInfo the list of regions the procedure was holding * @param regionInfo the list of regions the procedure was holding
*/ */
public void wakeRegions(final Procedure procedure,final TableName table, public void wakeRegions(final Procedure<?> procedure,final TableName table,
final RegionInfo... regionInfo) { final RegionInfo... regionInfo) {
Arrays.sort(regionInfo, RegionInfo.COMPARATOR); Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
schedLock(); schedLock();
@ -744,7 +743,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param namespace Namespace to lock * @param namespace Namespace to lock
* @return true if the procedure has to wait for the namespace to be available * @return true if the procedure has to wait for the namespace to be available
*/ */
public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { public boolean waitNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
schedLock(); schedLock();
try { try {
final LockAndQueue systemNamespaceTableLock = final LockAndQueue systemNamespaceTableLock =
@ -775,7 +774,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock * @param procedure the procedure releasing the lock
* @param namespace the namespace that has the exclusive lock * @param namespace the namespace that has the exclusive lock
*/ */
public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
schedLock(); schedLock();
try { try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
@ -893,7 +892,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @see #wakeMetaExclusiveLock(Procedure) * @see #wakeMetaExclusiveLock(Procedure)
* @param procedure the procedure trying to acquire the lock * @param procedure the procedure trying to acquire the lock
* @return true if the procedure has to wait for meta to be available * @return true if the procedure has to wait for meta to be available
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/ */
@Deprecated
public boolean waitMetaExclusiveLock(Procedure<?> procedure) { public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
schedLock(); schedLock();
try { try {
@ -914,7 +916,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* Wake the procedures waiting for meta. * Wake the procedures waiting for meta.
* @see #waitMetaExclusiveLock(Procedure) * @see #waitMetaExclusiveLock(Procedure)
* @param procedure the procedure releasing the lock * @param procedure the procedure releasing the lock
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/ */
@Deprecated
public void wakeMetaExclusiveLock(Procedure<?> procedure) { public void wakeMetaExclusiveLock(Procedure<?> procedure) {
schedLock(); schedLock();
try { try {

View File

@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/**
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
public interface MetaProcedureInterface { public interface MetaProcedureInterface {

View File

@ -22,6 +22,11 @@ import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/**
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
class MetaQueue extends Queue<TableName> { class MetaQueue extends Queue<TableName> {

View File

@ -48,7 +48,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
* This procedure recovers meta from prior shutdown/ crash of a server, and brings meta online by * This procedure recovers meta from prior shutdown/ crash of a server, and brings meta online by
* assigning meta region/s. Any place where meta is accessed and requires meta to be online, need to * assigning meta region/s. Any place where meta is accessed and requires meta to be online, need to
* submit this procedure instead of duplicating steps to recover meta in the code. * submit this procedure instead of duplicating steps to recover meta in the code.
* <p/>
* @deprecated Do not use any more, leave it here only for compatible. The recovery work will be
* done in {@link ServerCrashProcedure} directly, and the initial work for meta table
* will be done by {@link InitMetaProcedure}.
* @see ServerCrashProcedure
* @see InitMetaProcedure
*/ */
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
public class RecoverMetaProcedure public class RecoverMetaProcedure
extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.RecoverMetaState> extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.RecoverMetaState>
@ -281,7 +288,7 @@ public class RecoverMetaProcedure
* already initialized * already initialized
*/ */
private boolean isRunRequired() { private boolean isRunRequired() {
return failedMetaServer != null || !master.getAssignmentManager().isMetaInitialized(); return failedMetaServer != null || !master.getAssignmentManager().isMetaAssigned();
} }
/** /**

View File

@ -78,6 +78,11 @@ class SchemaLocking {
return getLock(regionLocks, encodedRegionName); return getLock(regionLocks, encodedRegionName);
} }
/**
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/
@Deprecated
LockAndQueue getMetaLock() { LockAndQueue getMetaLock() {
return metaLock; return metaLock;
} }

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager; import org.apache.hadoop.hbase.master.MasterWalManager;
@ -83,11 +85,8 @@ public class ServerCrashProcedure
* @param shouldSplitWal True if we should split WALs as part of crashed server processing. * @param shouldSplitWal True if we should split WALs as part of crashed server processing.
* @param carryingMeta True if carrying hbase:meta table region. * @param carryingMeta True if carrying hbase:meta table region.
*/ */
public ServerCrashProcedure( public ServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName,
final MasterProcedureEnv env, final boolean shouldSplitWal, final boolean carryingMeta) {
final ServerName serverName,
final boolean shouldSplitWal,
final boolean carryingMeta) {
this.serverName = serverName; this.serverName = serverName;
this.shouldSplitWal = shouldSplitWal; this.shouldSplitWal = shouldSplitWal;
this.carryingMeta = carryingMeta; this.carryingMeta = carryingMeta;
@ -119,18 +118,32 @@ public class ServerCrashProcedure
LOG.info("Start " + this); LOG.info("Start " + this);
// If carrying meta, process it first. Else, get list of regions on crashed server. // If carrying meta, process it first. Else, get list of regions on crashed server.
if (this.carryingMeta) { if (this.carryingMeta) {
setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
} else { } else {
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
} }
break; break;
case SERVER_CRASH_SPLIT_META_LOGS:
splitMetaLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
break;
case SERVER_CRASH_ASSIGN_META:
handleRIT(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
addChildProcedure(env.getAssignmentManager()
.createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO));
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_PROCESS_META:
// not used any more but still leave it here to keep compatible as there maybe old SCP
// which is stored in ProcedureStore which has this state.
processMeta(env);
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_GET_REGIONS: case SERVER_CRASH_GET_REGIONS:
// If hbase:meta is not assigned, yield. // If hbase:meta is not assigned, yield.
if (env.getAssignmentManager().waitMetaLoaded(this)) { if (env.getAssignmentManager().waitMetaLoaded(this)) {
throw new ProcedureSuspendedException(); throw new ProcedureSuspendedException();
} }
this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates() this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
.getServerRegionInfoSet(serverName); .getServerRegionInfoSet(serverName);
// Where to go next? Depends on whether we should split logs at all or // Where to go next? Depends on whether we should split logs at all or
@ -141,17 +154,10 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
} }
break; break;
case SERVER_CRASH_PROCESS_META:
processMeta(env);
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_SPLIT_LOGS: case SERVER_CRASH_SPLIT_LOGS:
splitLogs(env); splitLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
break; break;
case SERVER_CRASH_ASSIGN: case SERVER_CRASH_ASSIGN:
// If no regions to assign, skip assign and skip to the finish. // If no regions to assign, skip assign and skip to the finish.
// Filter out meta regions. Those are handled elsewhere in this procedure. // Filter out meta regions. Those are handled elsewhere in this procedure.
@ -177,18 +183,15 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_FINISH); setNextState(ServerCrashState.SERVER_CRASH_FINISH);
} }
break; break;
case SERVER_CRASH_HANDLE_RIT2: case SERVER_CRASH_HANDLE_RIT2:
// Noop. Left in place because we used to call handleRIT here for a second time // Noop. Left in place because we used to call handleRIT here for a second time
// but no longer necessary since HBASE-20634. // but no longer necessary since HBASE-20634.
setNextState(ServerCrashState.SERVER_CRASH_FINISH); setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break; break;
case SERVER_CRASH_FINISH: case SERVER_CRASH_FINISH:
services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getAssignmentManager().getRegionStates().removeServer(serverName);
services.getServerManager().getDeadServers().finish(serverName); services.getServerManager().getDeadServers().finish(serverName);
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
default: default:
throw new UnsupportedOperationException("unhandled state=" + state); throw new UnsupportedOperationException("unhandled state=" + state);
} }
@ -198,11 +201,6 @@ public class ServerCrashProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} }
/**
* @param env
* @throws IOException
*/
private void processMeta(final MasterProcedureEnv env) throws IOException { private void processMeta(final MasterProcedureEnv env) throws IOException {
LOG.debug("{}; processing hbase:meta", this); LOG.debug("{}; processing hbase:meta", this);
@ -227,10 +225,18 @@ public class ServerCrashProcedure
RegionReplicaUtil.isDefaultReplica(hri); RegionReplicaUtil.isDefaultReplica(hri);
} }
private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
LOG.debug("Splitting meta WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
am.getRegionStates().metaLogSplitting(serverName);
mwm.splitMetaLog(serverName);
am.getRegionStates().metaLogSplit(serverName);
LOG.debug("Done splitting meta WALs {}", this);
}
private void splitLogs(final MasterProcedureEnv env) throws IOException { private void splitLogs(final MasterProcedureEnv env) throws IOException {
if (LOG.isDebugEnabled()) { LOG.debug("Splitting WALs {}", this);
LOG.debug("Splitting WALs " + this);
}
MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running. // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
@ -271,9 +277,6 @@ public class ServerCrashProcedure
@Override @Override
protected LockState acquireLock(final MasterProcedureEnv env) { protected LockState acquireLock(final MasterProcedureEnv env) {
// TODO: Put this BACK AFTER AMv2 goes in!!!!
// if (env.waitFailoverCleanup(this)) return LockState.LOCK_EVENT_WAIT;
if (env.waitServerCrashProcessingEnabled(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) { if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) {
return LockState.LOCK_EVENT_WAIT; return LockState.LOCK_EVENT_WAIT;
} }

View File

@ -108,8 +108,6 @@ public class TestMetaTableAccessor {
* Does {@link MetaTableAccessor#getRegion(Connection, byte[])} and a write * Does {@link MetaTableAccessor#getRegion(Connection, byte[])} and a write
* against hbase:meta while its hosted server is restarted to prove our retrying * against hbase:meta while its hosted server is restarted to prove our retrying
* works. * works.
* @throws IOException
* @throws InterruptedException
*/ */
@Test public void testRetrying() @Test public void testRetrying()
throws IOException, InterruptedException { throws IOException, InterruptedException {

View File

@ -20,10 +20,8 @@ package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import com.google.protobuf.Service; import com.google.protobuf.Service;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ChoreService;
@ -57,8 +55,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import com.google.protobuf.Service;
public class MockNoopMasterServices implements MasterServices { public class MockNoopMasterServices implements MasterServices {
private final Configuration conf; private final Configuration conf;
private final MetricsMaster metricsMaster; private final MetricsMaster metricsMaster;
@ -214,16 +210,6 @@ public class MockNoopMasterServices implements MasterServices {
return null; return null;
} }
private boolean serverCrashProcessingEnabled = true;
public void setServerCrashProcessingEnabled(boolean b) {
serverCrashProcessingEnabled = b;
}
@Override
public boolean isServerCrashProcessingEnabled() {
return serverCrashProcessingEnabled;
}
@Override @Override
public boolean registerService(Service instance) { public boolean registerService(Service instance) {
return false; return false;
@ -452,11 +438,6 @@ public class MockNoopMasterServices implements MasterServices {
public void checkIfShouldMoveSystemRegionAsync() { public void checkIfShouldMoveSystemRegionAsync() {
} }
@Override
public boolean recoverMeta() throws IOException {
return false;
}
@Override @Override
public String getClientIdAuditPrefix() { public String getClientIdAuditPrefix() {
return null; return null;

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -90,7 +91,7 @@ public class TestCatalogJanitor {
} }
@Before @Before
public void setup() throws IOException { public void setup() throws IOException, KeeperException {
setRootDirAndCleanIt(HTU, this.name.getMethodName()); setRootDirAndCleanIt(HTU, this.name.getMethodName());
NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers = NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>(); new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>();

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.SortedSet; import java.util.SortedSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
@ -56,11 +55,18 @@ import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.zookeeper.KeeperException;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@ -70,10 +76,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.util.FSUtils;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
@ -174,7 +176,7 @@ public class MockMasterServices extends MockNoopMasterServices {
} }
public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
throws IOException { throws IOException, KeeperException {
startProcedureExecutor(remoteDispatcher); startProcedureExecutor(remoteDispatcher);
this.assignmentManager.start(); this.assignmentManager.start();
for (int i = 0; i < numServes; ++i) { for (int i = 0; i < numServes; ++i) {
@ -217,17 +219,14 @@ public class MockMasterServices extends MockNoopMasterServices {
private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
throws IOException { throws IOException {
final Configuration conf = getConfiguration(); final Configuration conf = getConfiguration();
final Path logDir = new Path(fileSystemManager.getRootDir(),
WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
this.procedureStore = new NoopProcedureStore(); this.procedureStore = new NoopProcedureStore();
this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
this.procedureEnv = new MasterProcedureEnv(this, this.procedureEnv = new MasterProcedureEnv(this,
remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
this.procedureExecutor = new ProcedureExecutor(conf, procedureEnv, procedureStore, this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
procedureEnv.getProcedureScheduler()); procedureEnv.getProcedureScheduler());
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max(Runtime.getRuntime().availableProcessors(), Math.max(Runtime.getRuntime().availableProcessors(),
@ -236,7 +235,7 @@ public class MockMasterServices extends MockNoopMasterServices {
MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
this.procedureStore.start(numThreads); this.procedureStore.start(numThreads);
this.procedureExecutor.start(numThreads, abortOnCorruption); ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
this.procedureEnv.getRemoteDispatcher().start(); this.procedureEnv.getRemoteDispatcher().start();
} }

View File

@ -150,7 +150,6 @@ public class TestAssignmentManager {
rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
am.wakeMetaLoadedEvent(); am.wakeMetaLoadedEvent();
am.setFailoverCleanupDone(true);
} }
@After @After
@ -427,18 +426,15 @@ public class TestAssignmentManager {
am = master.getAssignmentManager(); am = master.getAssignmentManager();
// Assign meta // Assign meta
master.setServerCrashProcessingEnabled(false);
rsDispatcher.setMockRsExecutor(new HangThenRSRestartExecutor()); rsDispatcher.setMockRsExecutor(new HangThenRSRestartExecutor());
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
assertEquals(true, am.isMetaInitialized()); assertEquals(true, am.isMetaAssigned());
// set it back as default, see setUpMeta() // set it back as default, see setUpMeta()
master.setServerCrashProcessingEnabled(true);
am.wakeMetaLoadedEvent(); am.wakeMetaLoadedEvent();
am.setFailoverCleanupDone(true);
} }
private Future<byte[]> submitProcedure(final Procedure proc) { private Future<byte[]> submitProcedure(final Procedure<?> proc) {
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
} }
@ -449,7 +445,7 @@ public class TestAssignmentManager {
LOG.info("ExecutionException", e); LOG.info("ExecutionException", e);
Exception ee = (Exception)e.getCause(); Exception ee = (Exception)e.getCause();
if (ee instanceof InterruptedIOException) { if (ee instanceof InterruptedIOException) {
for (Procedure p: this.master.getMasterProcedureExecutor().getProcedures()) { for (Procedure<?> p: this.master.getMasterProcedureExecutor().getProcedures()) {
LOG.info(p.toStringDetails()); LOG.info(p.toStringDetails());
} }
} }
@ -493,13 +489,6 @@ public class TestAssignmentManager {
return proc; return proc;
} }
private UnassignProcedure createAndSubmitUnassign(TableName tableName, int regionId) {
RegionInfo hri = createRegionInfo(tableName, regionId);
UnassignProcedure proc = am.createUnassignProcedure(hri, null, false);
master.getMasterProcedureExecutor().submitProcedure(proc);
return proc;
}
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) { private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
return RegionInfoBuilder.newBuilder(tableName) return RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes(regionId)) .setStartKey(Bytes.toBytes(regionId))

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterMetaBootstrap;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@ -85,7 +84,6 @@ public class MasterProcedureTestingUtility {
env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
} }
am.stop(); am.stop();
master.setServerCrashProcessingEnabled(false);
master.setInitialized(false); master.setInitialized(false);
return null; return null;
} }
@ -96,9 +94,6 @@ public class MasterProcedureTestingUtility {
public Void call() throws Exception { public Void call() throws Exception {
final AssignmentManager am = env.getAssignmentManager(); final AssignmentManager am = env.getAssignmentManager();
am.start(); am.start();
MasterMetaBootstrap metaBootstrap = new MasterMetaBootstrap(master);
metaBootstrap.recoverMeta();
metaBootstrap.processDeadServers();
am.joinCluster(); am.joinCluster();
master.setInitialized(true); master.setInitialized(true);
return null; return null;

View File

@ -22,18 +22,18 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After; import org.junit.After;
@ -84,7 +84,7 @@ public class TestMasterProcedureEvents {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
for (HTableDescriptor htd: UTIL.getAdmin().listTables()) { for (TableDescriptor htd: UTIL.getAdmin().listTableDescriptors()) {
LOG.info("Tear down, remove table=" + htd.getTableName()); LOG.info("Tear down, remove table=" + htd.getTableName());
UTIL.deleteTable(htd.getTableName()); UTIL.deleteTable(htd.getTableName());
} }
@ -96,58 +96,22 @@ public class TestMasterProcedureEvents {
HMaster master = UTIL.getMiniHBaseCluster().getMaster(); HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
HRegionInfo hri = new HRegionInfo(tableName); RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
HTableDescriptor htd = new HTableDescriptor(tableName); TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
htd.addFamily(new HColumnDescriptor("f")); .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
while (!master.isInitialized()) Thread.sleep(250); while (!master.isInitialized()) {
Thread.sleep(250);
}
master.setInitialized(false); // fake it, set back later master.setInitialized(false); // fake it, set back later
// check event wait/wake // check event wait/wake
testProcedureEventWaitWake(master, master.getInitializedEvent(), testProcedureEventWaitWake(master, master.getInitializedEvent(),
new CreateTableProcedure(procExec.getEnvironment(), htd, new HRegionInfo[] { hri })); new CreateTableProcedure(procExec.getEnvironment(), htd, new RegionInfo[] { hri }));
} }
@Test private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent<?> event,
public void testServerCrashProcedureEvent() throws Exception { final Procedure<?> proc) throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
master.getAssignmentManager().getRegionStates().hasRegionsInTransition()) {
Thread.sleep(25);
}
UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
try (Table t = UTIL.getConnection().getTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
}
master.setServerCrashProcessingEnabled(false); // fake it, set back later
// Kill a server. Master will notice but do nothing other than add it to list of dead servers.
HRegionServer hrs = getServerWithRegions();
boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName());
UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
hrs.join();
// Wait until the expiration of the server has arrived at the master. We won't process it
// by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
// here so ServerManager gets notice and adds expired server to appropriate queues.
while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
master.getServerManager().moveFromOnlineToDeadServers(hrs.getServerName());
// check event wait/wake
testProcedureEventWaitWake(master, master.getServerCrashProcessingEnabledEvent(),
new ServerCrashProcedure(procExec.getEnvironment(), hrs.getServerName(), true, carryingMeta));
}
private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event,
final Procedure proc) throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler(); final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler();
@ -188,14 +152,4 @@ public class TestMasterProcedureEvents {
" pollCalls=" + (procSched.getPollCalls() - startPollCalls) + " pollCalls=" + (procSched.getPollCalls() - startPollCalls) +
" nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls)); " nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls));
} }
private HRegionServer getServerWithRegions() {
for (int i = 0; i < 3; ++i) {
HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
if (hrs.getNumberOfOnlineRegions() > 0) {
return hrs;
}
}
return null;
}
} }

View File

@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
@ -102,13 +102,17 @@ public class TestServerCrashProcedure {
testRecoveryAndDoubleExecution(false, true); testRecoveryAndDoubleExecution(false, true);
} }
private long getSCPProcId(ProcedureExecutor<?> procExec) {
util.waitFor(30000, () -> !procExec.getProcedures().isEmpty());
return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong();
}
/** /**
* Run server crash procedure steps twice to test idempotency and that we are persisting all * Run server crash procedure steps twice to test idempotency and that we are persisting all
* needed state. * needed state.
* @throws Exception
*/ */
private void testRecoveryAndDoubleExecution(final boolean carryingMeta, private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution)
final boolean doubleExecution) throws Exception { throws Exception {
final TableName tableName = TableName.valueOf( final TableName tableName = TableName.valueOf(
"testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta); "testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta);
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS, final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
@ -123,33 +127,29 @@ public class TestServerCrashProcedure {
// Master's running of the server crash processing. // Master's running of the server crash processing.
final HMaster master = this.util.getHBaseCluster().getMaster(); final HMaster master = this.util.getHBaseCluster().getMaster();
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
master.setServerCrashProcessingEnabled(false);
// find the first server that match the request and executes the test // find the first server that match the request and executes the test
ServerName rsToKill = null; ServerName rsToKill = null;
for (HRegionInfo hri : util.getHBaseAdmin().getTableRegions(tableName)) { for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri); final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == carryingMeta) { if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == carryingMeta) {
rsToKill = serverName; rsToKill = serverName;
break; break;
} }
} }
// kill the RS
AssignmentTestingUtil.killRs(util, rsToKill);
// Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
master.setServerCrashProcessingEnabled(true);
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
master.getServerManager().moveFromOnlineToDeadServers(rsToKill);
// Enable test flags and then queue the crash procedure. // Enable test flags and then queue the crash procedure.
ProcedureTestingUtility.waitNoProcedureRunning(procExec); ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ServerCrashProcedure scp = new ServerCrashProcedure(procExec.getEnvironment(), rsToKill,
true, carryingMeta);
if (doubleExecution) { if (doubleExecution) {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
long procId = procExec.submitProcedure(scp); // kill the RS
AssignmentTestingUtil.killRs(util, rsToKill);
long procId = getSCPProcId(procExec);
// Now run through the procedure twice crashing the executor on each step... // Now run through the procedure twice crashing the executor on each step...
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
} else { } else {
ProcedureTestingUtility.submitAndWait(procExec, scp); // kill the RS
AssignmentTestingUtil.killRs(util, rsToKill);
long procId = getSCPProcId(procExec);
ProcedureTestingUtility.waitProcedure(procExec, procId);
} }
// Assert all data came back. // Assert all data came back.
assertEquals(count, util.countRows(t)); assertEquals(count, util.countRows(t));