diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 58ac82ee6cd..cbab595517f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -65,7 +65,7 @@ public class LogRoller extends AbstractWALRoller { } @VisibleForTesting - Map getWalNeedsRoll() { - return this.walNeedsRoll; + Map getWalNeedsRoll() { + return this.wals; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 699902009fa..9263c0f9d68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; @@ -57,31 +57,31 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; - protected final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>(); + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; - private volatile long lastRollTime = System.currentTimeMillis(); // Period to roll log. private final long rollPeriod; private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline - private long checkLowReplicationInterval; + private final long checkLowReplicationInterval; private volatile boolean running = true; public void addWAL(WAL wal) { // check without lock first - if (walNeedsRoll.containsKey(wal)) { + if (wals.containsKey(wal)) { return; } // this is to avoid race between addWAL and requestRollAll. synchronized (this) { - if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { + if (wals.putIfAbsent(wal, new RollController(wal)) == null) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized (AbstractWALRoller.this) { - walNeedsRoll.put(wal, Boolean.TRUE); + RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); + controller.requestRoll(); AbstractWALRoller.this.notifyAll(); } } @@ -92,9 +92,8 @@ public abstract class AbstractWALRoller extends Thread public void requestRollAll() { synchronized (this) { - List wals = new ArrayList(walNeedsRoll.keySet()); - for (WAL wal : wals) { - walNeedsRoll.put(wal, Boolean.TRUE); + for (RollController controller : wals.values()) { + controller.requestRoll(); } notifyAll(); } @@ -114,9 +113,9 @@ public abstract class AbstractWALRoller extends Thread */ private void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : wals.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().needsRoll(now); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -132,7 +131,7 @@ public abstract class AbstractWALRoller extends Thread // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it // is already broken. - for (WAL wal : walNeedsRoll.keySet()) { + for (WAL wal : wals.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery try { @@ -147,43 +146,39 @@ public abstract class AbstractWALRoller extends Thread @Override public void run() { while (running) { - boolean periodic = false; long now = System.currentTimeMillis(); checkLowReplication(now); - periodic = (now - this.lastRollTime) > this.rollPeriod; - if (periodic) { - // Time for periodic roll, fall through - LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); - } else { - synchronized (this) { - if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { - // WAL roll requested, fall through - LOG.debug("WAL roll requested"); - } else { - try { - wait(this.threadWakeFrequency); - } catch (InterruptedException e) { - // restore the interrupt state - Thread.currentThread().interrupt(); - } - // goto the beginning to check whether again whether we should fall through to roll - // several WALs, and also check whether we should quit. - continue; + synchronized (this) { + if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { + try { + wait(this.threadWakeFrequency); + } catch (InterruptedException e) { + // restore the interrupt state + Thread.currentThread().interrupt(); } + // goto the beginning to check whether again whether we should fall through to roll + // several WALs, and also check whether we should quit. + continue; } } try { - this.lastRollTime = System.currentTimeMillis(); - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); + for (Iterator> iter = wals.entrySet().iterator(); + iter.hasNext();) { + Entry entry = iter.next(); WAL wal = entry.getKey(); - // reset the flag in front to avoid missing roll request before we return from rollWriter. - walNeedsRoll.put(wal, Boolean.FALSE); + RollController controller = entry.getValue(); + if (controller.isRollRequested()) { + // WAL roll requested, fall through + LOG.debug("WAL {} roll requested", wal); + } else if (controller.needsPeriodicRoll(now)) { + // Time for periodic roll, fall through + LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); + } else { + continue; + } // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - Map> regionsToFlush = wal.rollWriter(periodic || - entry.getValue().booleanValue()); + Map> regionsToFlush = controller.rollWal(now); if (regionsToFlush != null) { for (Map.Entry> r : regionsToFlush.entrySet()) { scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); @@ -226,7 +221,8 @@ public abstract class AbstractWALRoller extends Thread * @return true if all WAL roll finished */ public boolean walRollFinished() { - return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting(); + return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis())) + && isWaiting(); } /** @@ -243,4 +239,43 @@ public abstract class AbstractWALRoller extends Thread running = false; interrupt(); } + + /** + * Independently control the roll of each wal. When use multiwal, + * can avoid all wal roll together. see HBASE-24665 for detail + */ + protected class RollController { + private final WAL wal; + private final AtomicBoolean rollRequest; + private long lastRollTime; + + RollController(WAL wal) { + this.wal = wal; + this.rollRequest = new AtomicBoolean(false); + this.lastRollTime = System.currentTimeMillis(); + } + + public void requestRoll() { + this.rollRequest.set(true); + } + + public Map> rollWal(long now) throws IOException { + this.lastRollTime = now; + // reset the flag in front to avoid missing roll request before we return from rollWriter. + this.rollRequest.set(false); + return wal.rollWriter(true); + } + + public boolean isRollRequested() { + return rollRequest.get(); + } + + public boolean needsPeriodicRoll(long now) { + return (now - this.lastRollTime) > rollPeriod; + } + + public boolean needsRoll(long now) { + return isRollRequested() || needsPeriodicRoll(now); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java new file mode 100644 index 00000000000..86b9bc3814b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestLogRoller { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLogRoller.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final int LOG_ROLL_PERIOD = 20 * 1000; + private static final String LOG_DIR = "WALs"; + private static final String ARCHIVE_DIR = "archiveWALs"; + private static final String WAL_PREFIX = "test-log-roller"; + private static Configuration CONF; + private static LogRoller ROLLER; + private static Path ROOT_DIR; + private static FileSystem FS; + + @Before + public void setup() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); + CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300); + ROOT_DIR = TEST_UTIL.getRandomDir(); + FS = FileSystem.get(CONF); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + Mockito.when(services.getConfiguration()).thenReturn(CONF); + ROLLER = new LogRoller(services); + ROLLER.start(); + } + + @After + public void tearDown() throws Exception { + ROLLER.close(); + FS.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * verify that each wal roll separately + */ + @Test + public void testRequestRollWithMultiWal() throws Exception { + // add multiple wal + Map wals = new HashMap<>(); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, "." + i); + wal.init(); + wals.put(wal, wal.getCurrentFileName()); + ROLLER.addWAL(wal); + Thread.sleep(1000); + } + + // request roll + Iterator> it = wals.entrySet().iterator(); + Map.Entry walEntry = it.next(); + walEntry.getKey().requestLogRoll(); + Thread.sleep(5000); + + assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + walEntry.setValue(walEntry.getKey().getCurrentFileName()); + while (it.hasNext()) { + walEntry = it.next(); + assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + } + + // period roll + Thread.sleep(LOG_ROLL_PERIOD + 5000); + for (Map.Entry entry : wals.entrySet()) { + assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName()); + entry.getKey().close(); + } + } +}