HBASE-27157 Potential race condition in WorkerAssigner (#4577)

Co-authored-by: huiruan <huiruan@tencent.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Lijin Bin <binlijin@apache.org>
This commit is contained in:
Ruanhui 2022-07-06 10:59:13 +08:00 committed by GitHub
parent 6031a3a8d4
commit f76d8554ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 42 deletions

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@ -153,25 +152,19 @@ public class SplitWALManager {
*/
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = splitWorkerAssigner.acquire();
if (worker.isPresent()) {
LOG.debug("Acquired split WAL worker={}", worker.get());
return worker.get();
}
splitWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
ServerName worker = splitWorkerAssigner.acquire(procedure);
LOG.debug("Acquired split WAL worker={}", worker);
return worker;
}
/**
* After the worker finished the split WAL task, it will release the worker, and wake up all the
* suspend procedures in the ProcedureEvent
* @param worker worker which is about to release
* @param scheduler scheduler which is to wake up the procedure event
*/
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
public void releaseSplitWALWorker(ServerName worker) {
LOG.debug("Release split WAL worker={}", worker);
splitWorkerAssigner.release(worker);
splitWorkerAssigner.wake(scheduler);
}
/**

View File

@ -23,9 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -51,7 +51,7 @@ public class WorkerAssigner implements ServerListener {
}
}
public synchronized Optional<ServerName> acquire() {
public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException {
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
Collections.shuffle(serverList);
Optional<ServerName> worker = serverList.stream()
@ -60,27 +60,30 @@ public class WorkerAssigner implements ServerListener {
.findAny();
worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
return worker;
if (worker.isPresent()) {
ServerName sn = worker.get();
currentWorkers.compute(sn, (serverName,
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
return sn;
} else {
event.suspend();
event.suspendIfNotReady(proc);
throw new ProcedureSuspendedException();
}
}
public synchronized void release(ServerName serverName) {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
}
public void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}
public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}
@Override
public void serverAdded(ServerName worker) {
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
public synchronized void serverAdded(ServerName worker) {
if (!event.isReady()) {
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}
public synchronized void addUsedWorker(ServerName worker) {

View File

@ -109,8 +109,7 @@ public class SnapshotVerifyProcedure extends ServerRemoteProcedure
setFailure("verify-snapshot", e);
} finally {
// release the worker
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer);
}
}

View File

@ -90,7 +90,7 @@ public class SplitWALProcedure
skipPersistence();
throw new ProcedureSuspendedException();
}
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(worker);
if (!finished) {
LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
@ -1419,20 +1418,14 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = verifyWorkerAssigner.acquire();
if (worker.isPresent()) {
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
return worker.get();
}
verifyWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
ServerName worker = verifyWorkerAssigner.acquire(procedure);
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
return worker;
}
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
MasterProcedureScheduler scheduler) {
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
verifyWorkerAssigner.release(worker);
verifyWorkerAssigner.wake(scheduler);
}
private void restoreWorkers() {

View File

@ -115,8 +115,7 @@ public class TestSplitWALManager {
Assert.assertNotNull(e);
Assert.assertTrue(e instanceof ProcedureSuspendedException);
splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(server);
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
}
@ -348,7 +347,7 @@ public class TestSplitWALManager {
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
return Flow.HAS_MORE_STATE;
case RELEASE_SPLIT_WORKER:
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(worker);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);