diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 58ac82ee6cd..cbab595517f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -65,7 +65,7 @@ public class LogRoller extends AbstractWALRoller { } @VisibleForTesting - Map getWalNeedsRoll() { - return this.walNeedsRoll; + Map getWalNeedsRoll() { + return this.wals; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index d2b67170409..a5a0ee3a322 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; @@ -58,31 +58,31 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; - protected final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>(); + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; - private volatile long lastRollTime = System.currentTimeMillis(); // Period to roll log. private final long rollPeriod; private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline - private long checkLowReplicationInterval; + private final long checkLowReplicationInterval; private volatile boolean running = true; public void addWAL(WAL wal) { // check without lock first - if (walNeedsRoll.containsKey(wal)) { + if (wals.containsKey(wal)) { return; } // this is to avoid race between addWAL and requestRollAll. synchronized (this) { - if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { + if (wals.putIfAbsent(wal, new RollController(wal)) == null) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized (AbstractWALRoller.this) { - walNeedsRoll.put(wal, Boolean.TRUE); + RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); + controller.requestRoll(); AbstractWALRoller.this.notifyAll(); } } @@ -93,9 +93,8 @@ public abstract class AbstractWALRoller extends Thread public void requestRollAll() { synchronized (this) { - List wals = new ArrayList(walNeedsRoll.keySet()); - for (WAL wal : wals) { - walNeedsRoll.put(wal, Boolean.TRUE); + for (RollController controller : wals.values()) { + controller.requestRoll(); } notifyAll(); } @@ -115,9 +114,9 @@ public abstract class AbstractWALRoller extends Thread */ private void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : wals.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().needsRoll(now); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -133,7 +132,7 @@ public abstract class AbstractWALRoller extends Thread // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it // is already broken. - for (WAL wal : walNeedsRoll.keySet()) { + for (WAL wal : wals.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery try { @@ -148,53 +147,49 @@ public abstract class AbstractWALRoller extends Thread @Override public void run() { while (running) { - boolean periodic = false; long now = System.currentTimeMillis(); checkLowReplication(now); - periodic = (now - this.lastRollTime) > this.rollPeriod; - if (periodic) { - // Time for periodic roll, fall through - LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); - } else { - synchronized (this) { - if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { - // WAL roll requested, fall through - LOG.debug("WAL roll requested"); - } else { - try { - wait(this.threadWakeFrequency); - } catch (InterruptedException e) { - // restore the interrupt state - Thread.currentThread().interrupt(); - } - // goto the beginning to check whether again whether we should fall through to roll - // several WALs, and also check whether we should quit. - continue; + synchronized (this) { + if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { + try { + wait(this.threadWakeFrequency); + } catch (InterruptedException e) { + // restore the interrupt state + Thread.currentThread().interrupt(); } + // goto the beginning to check whether again whether we should fall through to roll + // several WALs, and also check whether we should quit. + continue; } } try { - this.lastRollTime = System.currentTimeMillis(); - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); + for (Iterator> iter = wals.entrySet().iterator(); + iter.hasNext();) { + Entry entry = iter.next(); WAL wal = entry.getKey(); - // reset the flag in front to avoid missing roll request before we return from rollWriter. - walNeedsRoll.put(wal, Boolean.FALSE); - Map> regionsToFlush = null; + RollController controller = entry.getValue(); + if (controller.isRollRequested()) { + // WAL roll requested, fall through + LOG.debug("WAL {} roll requested", wal); + } else if (controller.needsPeriodicRoll(now)){ + // Time for periodic roll, fall through + LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); + } else { + continue; + } try { // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); + Map> regionsToFlush = controller.rollWal(now); + if (regionsToFlush != null) { + for (Map.Entry> r : regionsToFlush.entrySet()) { + scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); + } + } } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); iter.remove(); } - if (regionsToFlush != null) { - for (Map.Entry> r : regionsToFlush.entrySet()) { - scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); - } - } afterRoll(wal); } } catch (FailedLogCloseException | ConnectException e) { @@ -232,7 +227,9 @@ public abstract class AbstractWALRoller extends Thread * @return true if all WAL roll finished */ public boolean walRollFinished() { - return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting(); + // TODO add a status field of roll in RollController + return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis())) + && isWaiting(); } /** @@ -249,4 +246,43 @@ public abstract class AbstractWALRoller extends Thread running = false; interrupt(); } + + /** + * Independently control the roll of each wal. When use multiwal, + * can avoid all wal roll together. see HBASE-24665 for detail + */ + protected class RollController { + private final WAL wal; + private final AtomicBoolean rollRequest; + private long lastRollTime; + + RollController(WAL wal) { + this.wal = wal; + this.rollRequest = new AtomicBoolean(false); + this.lastRollTime = System.currentTimeMillis(); + } + + public void requestRoll() { + this.rollRequest.set(true); + } + + public Map> rollWal(long now) throws IOException { + this.lastRollTime = now; + // reset the flag in front to avoid missing roll request before we return from rollWriter. + this.rollRequest.set(false); + return wal.rollWriter(true); + } + + public boolean isRollRequested() { + return rollRequest.get(); + } + + public boolean needsPeriodicRoll(long now) { + return (now - this.lastRollTime) > rollPeriod; + } + + public boolean needsRoll(long now) { + return isRollRequested() || needsPeriodicRoll(now); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index 7892d4478f2..ed7d5dc9c43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -19,22 +19,28 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; @Category({RegionServerTests.class, MediumTests.class}) public class TestLogRoller { @@ -43,53 +49,104 @@ public class TestLogRoller { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestLogRoller.class); - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final int logRollPeriod = 20 * 1000; + private static final int LOG_ROLL_PERIOD = 20 * 1000; + private static final String LOG_DIR = "WALs"; + private static final String ARCHIVE_DIR = "archiveWALs"; + private static final String WAL_PREFIX = "test-log-roller"; + private static Configuration CONF; + private static LogRoller ROLLER; + private static Path ROOT_DIR; + private static FileSystem FS; @Before public void setup() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period", logRollPeriod); - TEST_UTIL.startMiniCluster(1); - TableName name = TableName.valueOf("Test"); - TEST_UTIL.createTable(name, Bytes.toBytes("cf")); - TEST_UTIL.waitTableAvailable(name); + CONF = TEST_UTIL.getConfiguration(); + CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); + CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300); + ROOT_DIR = TEST_UTIL.getRandomDir(); + FS = FileSystem.get(CONF); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + Mockito.when(services.getConfiguration()).thenReturn(CONF); + ROLLER = new LogRoller(services); + ROLLER.start(); } @After public void tearDown() throws Exception { + ROLLER.close(); + FS.close(); TEST_UTIL.shutdownMiniCluster(); } @Test public void testRemoveClosedWAL() throws Exception { - HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); - Configuration conf = rs.getConfiguration(); - LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller(); - int originalSize = logRoller.getWalNeedsRoll().size(); - FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal1); - FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal2); - FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal3); + assertEquals(0, ROLLER.getWalNeedsRoll().size()); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, getWALSuffix(i)); + ROLLER.addWAL(wal); + } - assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size()); - assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1)); + assertEquals(3, ROLLER.getWalNeedsRoll().size()); + Iterator it = ROLLER.getWalNeedsRoll().keySet().iterator(); + WAL wal = it.next(); + assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal)); - wal1.close(); - Thread.sleep(2 * logRollPeriod); + wal.close(); + Thread.sleep(LOG_ROLL_PERIOD + 5000); - assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size()); - assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1)); + assertEquals(2, ROLLER.getWalNeedsRoll().size()); + assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal)); - wal2.close(); - wal3.close(); - Thread.sleep(2 * logRollPeriod); + wal = it.next(); + wal.close(); + wal = it.next(); + wal.close(); + Thread.sleep(LOG_ROLL_PERIOD + 5000); - assertEquals(originalSize, logRoller.getWalNeedsRoll().size()); + assertEquals(0, ROLLER.getWalNeedsRoll().size()); + } + + /** + * verify that each wal roll separately + */ + @Test + public void testRequestRollWithMultiWal() throws Exception { + // add multiple wal + Map wals = new HashMap<>(); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, getWALSuffix(i)); + wal.init(); + wals.put(wal, wal.getCurrentFileName()); + ROLLER.addWAL(wal); + Thread.sleep(1000); + } + + // request roll + Iterator> it = wals.entrySet().iterator(); + Map.Entry walEntry = it.next(); + walEntry.getKey().requestLogRoll(); + Thread.sleep(5000); + + assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + walEntry.setValue(walEntry.getKey().getCurrentFileName()); + while (it.hasNext()) { + walEntry = it.next(); + assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + } + + // period roll + Thread.sleep(LOG_ROLL_PERIOD + 5000); + for (Map.Entry entry : wals.entrySet()) { + assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName()); + entry.getKey().close(); + } + } + + private static String getWALSuffix(int id) { + return "." + id; } }