HBASE-24564: Make RS abort call idempotent.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bharath Vissapragada 2020-06-15 12:22:38 -07:00
parent 2ecbaf130e
commit 2567d15218
3 changed files with 70 additions and 18 deletions

View File

@ -2824,11 +2824,10 @@ public class HMaster extends HRegionServer implements MasterServices {
@Override
public void abort(String reason, Throwable cause) {
if (isAborted() || isStopped()) {
if (!setAbortRequested() || isStopped()) {
LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped());
return;
}
setAbortRequested();
if (cpHost != null) {
// HBASE-4014: dump a list of loaded coprocessors.
LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " +

View File

@ -335,7 +335,7 @@ public class HRegionServer extends Thread implements
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
private volatile boolean abortRequested;
private AtomicBoolean abortRequested;
static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
// Default abort timeout is 1200 seconds for safe
private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
@ -591,7 +591,7 @@ public class HRegionServer extends Thread implements
this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
this.abortRequested = false;
this.abortRequested = new AtomicBoolean(false);
this.stopped = false;
if (!(this instanceof HMaster)) {
@ -1005,7 +1005,7 @@ public class HRegionServer extends Thread implements
} else if (!this.stopping) {
this.stopping = true;
LOG.info("Closing user regions");
closeUserRegions(this.abortRequested);
closeUserRegions(this.abortRequested.get());
} else {
boolean allUserRegionsOffline = areAllUserRegionsOffline();
if (allUserRegionsOffline) {
@ -1021,7 +1021,7 @@ public class HRegionServer extends Thread implements
// Make sure all regions have been closed -- some regions may
// have not got it because we were splitting at the time of
// the call to closeUserRegions.
closeUserRegions(this.abortRequested);
closeUserRegions(this.abortRequested.get());
}
LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
}
@ -1076,18 +1076,18 @@ public class HRegionServer extends Thread implements
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
rspmHost.stop(this.abortRequested || this.killed);
rspmHost.stop(this.abortRequested.get() || this.killed);
}
if (this.killed) {
// Just skip out w/o closing regions. Used when testing.
} else if (abortRequested) {
} else if (abortRequested.get()) {
if (this.dataFsOk) {
closeUserRegions(abortRequested); // Don't leave any open file handles
closeUserRegions(abortRequested.get()); // Don't leave any open file handles
}
LOG.info("aborting server " + this.serverName);
} else {
closeUserRegions(abortRequested);
closeUserRegions(abortRequested.get());
LOG.info("stopping server " + this.serverName);
}
@ -1102,17 +1102,17 @@ public class HRegionServer extends Thread implements
}
// Closing the compactSplit thread before closing meta regions
if (!this.killed && containsMetaTableRegions()) {
if (!abortRequested || this.dataFsOk) {
if (!abortRequested.get() || this.dataFsOk) {
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
this.compactSplitThread = null;
}
closeMetaTableRegions(abortRequested);
closeMetaTableRegions(abortRequested.get());
}
}
if (!this.killed && this.dataFsOk) {
waitOnAllRegionsToClose(abortRequested);
waitOnAllRegionsToClose(abortRequested.get());
LOG.info("stopping server " + this.serverName + "; all regions closed.");
}
@ -1127,7 +1127,7 @@ public class HRegionServer extends Thread implements
// flag may be changed when closing regions throws exception.
if (this.dataFsOk) {
shutdownWAL(!abortRequested);
shutdownWAL(!abortRequested.get());
}
// Make sure the proxy is down.
@ -2475,13 +2475,18 @@ public class HRegionServer extends Thread implements
*/
@Override
public void abort(String reason, Throwable cause) {
if (!setAbortRequested()) {
// Abort already in progress, ignore the new request.
LOG.debug(
"Abort already in progress. Ignoring the current request with reason: {}", reason);
return;
}
String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
if (cause != null) {
LOG.error(HBaseMarkers.FATAL, msg, cause);
} else {
LOG.error(HBaseMarkers.FATAL, msg);
}
setAbortRequested();
// HBASE-4014: show list of coprocessors that were loaded to help debug
// regionserver crashes.Note that we're implicitly using
// java.util.HashSet's toString() method to print the coprocessor names.
@ -2517,8 +2522,13 @@ public class HRegionServer extends Thread implements
stop(reason, true, null);
}
protected final void setAbortRequested() {
this.abortRequested = true;
/**
* Sets the abort state if not already set.
* @return True if abortRequested set to True successfully, false if an abort is already in
* progress.
*/
protected boolean setAbortRequested() {
return abortRequested.compareAndSet(false, true);
}
/**
@ -2530,7 +2540,7 @@ public class HRegionServer extends Thread implements
@Override
public boolean isAborted() {
return this.abortRequested;
return abortRequested.get();
}
/*

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -24,6 +25,10 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
@ -171,11 +177,43 @@ public class TestRegionServerAbort {
assertFalse(cluster.getRegionServer(0).isStopped());
}
/**
* Tests that only a single abort is processed when multiple aborts are requested.
*/
@Test
public void testMultiAbort() {
assertTrue(cluster.getRegionServerThreads().size() > 0);
JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0);
assertTrue(t.isAlive());
HRegionServer rs = t.getRegionServer();
assertFalse(rs.isAborted());
RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(
StopBlockingRegionObserver.class.getName());
// Enable clean abort.
cp.setStopAllowed(true);
// Issue two aborts in quick succession.
// We need a thread pool here, otherwise the abort() runs into SecurityException when running
// from the fork join pool when setting the context classloader.
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor);
CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor);
long testTimeoutMs = 10 * 1000;
Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate<Exception>) rs::isStopped);
// Make sure only one abort is received.
assertEquals(1, cp.getNumAbortsRequested());
} finally {
executor.shutdownNow();
}
}
@CoreCoprocessor
public static class StopBlockingRegionObserver
implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver {
public static final String DO_ABORT = "DO_ABORT";
private boolean stopAllowed;
private AtomicInteger abortCount = new AtomicInteger();
@Override
public Optional<RegionObserver> getRegionObserver() {
@ -203,11 +241,16 @@ public class TestRegionServerAbort {
@Override
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
throws IOException {
abortCount.incrementAndGet();
if (!stopAllowed) {
throw new IOException("Stop not allowed");
}
}
public int getNumAbortsRequested() {
return abortCount.get();
}
public void setStopAllowed(boolean allowed) {
this.stopAllowed = allowed;
}