From 2567d1521875ca39eca4762242ce91a3012e088e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 15 Jun 2020 12:22:38 -0700 Subject: [PATCH] HBASE-24564: Make RS abort call idempotent. Signed-off-by: Duo Zhang --- .../apache/hadoop/hbase/master/HMaster.java | 3 +- .../hbase/regionserver/HRegionServer.java | 42 +++++++++++------- .../regionserver/TestRegionServerAbort.java | 43 +++++++++++++++++++ 3 files changed, 70 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a95f133e23b..9bd5c8cab40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -2824,11 +2824,10 @@ public class HMaster extends HRegionServer implements MasterServices { @Override public void abort(String reason, Throwable cause) { - if (isAborted() || isStopped()) { + if (!setAbortRequested() || isStopped()) { LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped()); return; } - setAbortRequested(); if (cpHost != null) { // HBASE-4014: dump a list of loaded coprocessors. LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 26b3fc25195..0b97c99a812 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -335,7 +335,7 @@ public class HRegionServer extends Thread implements // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. - private volatile boolean abortRequested; + private AtomicBoolean abortRequested; static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; // Default abort timeout is 1200 seconds for safe private static final long DEFAULT_ABORT_TIMEOUT = 1200000; @@ -591,7 +591,7 @@ public class HRegionServer extends Thread implements this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); - this.abortRequested = false; + this.abortRequested = new AtomicBoolean(false); this.stopped = false; if (!(this instanceof HMaster)) { @@ -1005,7 +1005,7 @@ public class HRegionServer extends Thread implements } else if (!this.stopping) { this.stopping = true; LOG.info("Closing user regions"); - closeUserRegions(this.abortRequested); + closeUserRegions(this.abortRequested.get()); } else { boolean allUserRegionsOffline = areAllUserRegionsOffline(); if (allUserRegionsOffline) { @@ -1021,7 +1021,7 @@ public class HRegionServer extends Thread implements // Make sure all regions have been closed -- some regions may // have not got it because we were splitting at the time of // the call to closeUserRegions. - closeUserRegions(this.abortRequested); + closeUserRegions(this.abortRequested.get()); } LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); } @@ -1076,18 +1076,18 @@ public class HRegionServer extends Thread implements // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { - rspmHost.stop(this.abortRequested || this.killed); + rspmHost.stop(this.abortRequested.get() || this.killed); } if (this.killed) { // Just skip out w/o closing regions. Used when testing. - } else if (abortRequested) { + } else if (abortRequested.get()) { if (this.dataFsOk) { - closeUserRegions(abortRequested); // Don't leave any open file handles + closeUserRegions(abortRequested.get()); // Don't leave any open file handles } LOG.info("aborting server " + this.serverName); } else { - closeUserRegions(abortRequested); + closeUserRegions(abortRequested.get()); LOG.info("stopping server " + this.serverName); } @@ -1102,17 +1102,17 @@ public class HRegionServer extends Thread implements } // Closing the compactSplit thread before closing meta regions if (!this.killed && containsMetaTableRegions()) { - if (!abortRequested || this.dataFsOk) { + if (!abortRequested.get() || this.dataFsOk) { if (this.compactSplitThread != null) { this.compactSplitThread.join(); this.compactSplitThread = null; } - closeMetaTableRegions(abortRequested); + closeMetaTableRegions(abortRequested.get()); } } if (!this.killed && this.dataFsOk) { - waitOnAllRegionsToClose(abortRequested); + waitOnAllRegionsToClose(abortRequested.get()); LOG.info("stopping server " + this.serverName + "; all regions closed."); } @@ -1127,7 +1127,7 @@ public class HRegionServer extends Thread implements // flag may be changed when closing regions throws exception. if (this.dataFsOk) { - shutdownWAL(!abortRequested); + shutdownWAL(!abortRequested.get()); } // Make sure the proxy is down. @@ -2475,13 +2475,18 @@ public class HRegionServer extends Thread implements */ @Override public void abort(String reason, Throwable cause) { + if (!setAbortRequested()) { + // Abort already in progress, ignore the new request. + LOG.debug( + "Abort already in progress. Ignoring the current request with reason: {}", reason); + return; + } String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; if (cause != null) { LOG.error(HBaseMarkers.FATAL, msg, cause); } else { LOG.error(HBaseMarkers.FATAL, msg); } - setAbortRequested(); // HBASE-4014: show list of coprocessors that were loaded to help debug // regionserver crashes.Note that we're implicitly using // java.util.HashSet's toString() method to print the coprocessor names. @@ -2517,8 +2522,13 @@ public class HRegionServer extends Thread implements stop(reason, true, null); } - protected final void setAbortRequested() { - this.abortRequested = true; + /** + * Sets the abort state if not already set. + * @return True if abortRequested set to True successfully, false if an abort is already in + * progress. + */ + protected boolean setAbortRequested() { + return abortRequested.compareAndSet(false, true); } /** @@ -2530,7 +2540,7 @@ public class HRegionServer extends Thread implements @Override public boolean isAborted() { - return this.abortRequested; + return abortRequested.get(); } /* diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java index 277cb8139e1..478393d81f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbort.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -24,6 +25,10 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -171,11 +177,43 @@ public class TestRegionServerAbort { assertFalse(cluster.getRegionServer(0).isStopped()); } + /** + * Tests that only a single abort is processed when multiple aborts are requested. + */ + @Test + public void testMultiAbort() { + assertTrue(cluster.getRegionServerThreads().size() > 0); + JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0); + assertTrue(t.isAlive()); + HRegionServer rs = t.getRegionServer(); + assertFalse(rs.isAborted()); + RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); + StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor( + StopBlockingRegionObserver.class.getName()); + // Enable clean abort. + cp.setStopAllowed(true); + // Issue two aborts in quick succession. + // We need a thread pool here, otherwise the abort() runs into SecurityException when running + // from the fork join pool when setting the context classloader. + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor); + CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor); + long testTimeoutMs = 10 * 1000; + Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate) rs::isStopped); + // Make sure only one abort is received. + assertEquals(1, cp.getNumAbortsRequested()); + } finally { + executor.shutdownNow(); + } + } + @CoreCoprocessor public static class StopBlockingRegionObserver implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver { public static final String DO_ABORT = "DO_ABORT"; private boolean stopAllowed; + private AtomicInteger abortCount = new AtomicInteger(); @Override public Optional getRegionObserver() { @@ -203,11 +241,16 @@ public class TestRegionServerAbort { @Override public void preStopRegionServer(ObserverContext env) throws IOException { + abortCount.incrementAndGet(); if (!stopAllowed) { throw new IOException("Stop not allowed"); } } + public int getNumAbortsRequested() { + return abortCount.get(); + } + public void setStopAllowed(boolean allowed) { this.stopAllowed = allowed; }