HBASE-21468 separate workers for meta table is not working
This commit is contained in:
parent
c95832159f
commit
0f295de156
|
@ -124,7 +124,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
|||
try {
|
||||
enqueue(procedure, addFront);
|
||||
if (notify) {
|
||||
schedWaitCond.signal();
|
||||
schedWaitCond.signalAll();
|
||||
}
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -311,10 +311,6 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
|||
if (waitingCount <= 0) {
|
||||
return;
|
||||
}
|
||||
if (waitingCount == 1) {
|
||||
schedWaitCond.signal();
|
||||
} else {
|
||||
schedWaitCond.signalAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
/**
|
||||
* Worker thread only for urgent tasks.
|
||||
*/
|
||||
private List<WorkerThread> urgentWorkerThreads;
|
||||
private CopyOnWriteArrayList<WorkerThread> urgentWorkerThreads;
|
||||
|
||||
/**
|
||||
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
|
||||
|
@ -564,7 +564,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* is found on replay. otherwise false.
|
||||
*/
|
||||
public void init(int numThreads, boolean abortOnCorruption) throws IOException {
|
||||
init(numThreads, 1, abortOnCorruption);
|
||||
init(numThreads, 0, abortOnCorruption);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -595,7 +595,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// Create the workers
|
||||
workerId.set(0);
|
||||
workerThreads = new CopyOnWriteArrayList<>();
|
||||
urgentWorkerThreads = new ArrayList<>();
|
||||
urgentWorkerThreads = new CopyOnWriteArrayList<>();
|
||||
for (int i = 0; i < corePoolSize; ++i) {
|
||||
workerThreads.add(new WorkerThread(threadGroup));
|
||||
}
|
||||
|
@ -637,7 +637,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return;
|
||||
}
|
||||
// Start the executors. Here we must have the lastProcId set.
|
||||
LOG.debug("Start workers {}, urgent workers", workerThreads.size(),
|
||||
LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(),
|
||||
urgentWorkerThreads.size());
|
||||
timeoutExecutor.start();
|
||||
for (WorkerThread worker: workerThreads) {
|
||||
|
@ -2023,7 +2023,8 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
long lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||
try {
|
||||
while (isRunning() && keepAlive(lastUpdate)) {
|
||||
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
|
||||
Procedure<TEnvironment> proc = scheduler
|
||||
.poll(onlyPollUrgent, keepAliveTime, TimeUnit.MILLISECONDS);
|
||||
if (proc == null) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public class ProcedureTestingUtility {
|
|||
|
||||
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
|
||||
boolean abortOnCorruption, boolean startWorkers) throws IOException {
|
||||
initAndStartWorkers(procExecutor, numThreads, 1, abortOnCorruption, startWorkers);
|
||||
initAndStartWorkers(procExecutor, numThreads, 0, abortOnCorruption, startWorkers);
|
||||
}
|
||||
|
||||
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
|
||||
|
|
|
@ -380,7 +380,6 @@ public class TestYieldProcedures {
|
|||
|
||||
@Override
|
||||
public Procedure poll() {
|
||||
LOG.error("polled()");
|
||||
pollCalls++;
|
||||
return super.poll();
|
||||
}
|
||||
|
@ -388,10 +387,15 @@ public class TestYieldProcedures {
|
|||
@Override
|
||||
public Procedure poll(long timeout, TimeUnit unit) {
|
||||
pollCalls++;
|
||||
LOG.error("polled(long timeout, TimeUnit unit)");
|
||||
return super.poll(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) {
|
||||
pollCalls++;
|
||||
return super.poll(onlyUrgent, timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completionCleanup(Procedure proc) {
|
||||
completionCalls++;
|
||||
|
|
|
@ -62,6 +62,7 @@ public class TestAssignmentOnRSCrash {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.set("hbase.balancer.tablesOnMaster", "none");
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ public class TestMergeTableRegionsProcedure {
|
|||
conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
|
||||
conf.setInt("hbase.master.ping.server.retry.sleep.interval", 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -94,6 +94,7 @@ public class TestRogueRSAssignment {
|
|||
conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
|
||||
conf.setInt("hbase.master.ping.server.retry.sleep.interval", 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -70,6 +70,7 @@ public class TestLockManager {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
|
||||
conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ public class TestLockProcedure {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
|
||||
conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
|
||||
conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
|
||||
|
|
|
@ -57,6 +57,7 @@ public class TestCreateNamespaceProcedure {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -63,6 +63,7 @@ public class TestDeleteNamespaceProcedure {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -67,6 +67,7 @@ public class TestMasterFailoverWithProcedures {
|
|||
conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 1);
|
||||
conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
|
@ -72,6 +72,7 @@ public class TestMasterObserverPostCalls {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.set(MasterCoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
MasterObserverForTest.class.getName());
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ public class TestMasterProcedureEvents {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ public class TestModifyNamespaceProcedure {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -60,6 +60,7 @@ public class TestProcedureAdmin {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -58,6 +58,7 @@ public class TestSafemodeBringsDownMaster {
|
|||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
|
||||
conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
|
||||
}
|
||||
|
||||
|
|
|
@ -169,7 +169,7 @@ public class TestUrgentProcedureWorker {
|
|||
MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
|
||||
procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
|
||||
scheduler);
|
||||
procExec.init(1, false);
|
||||
procExec.init(1, 1, false);
|
||||
procExec.startWorkers();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue