From e06695112a358706344cc8682f2713b43daab340 Mon Sep 17 00:00:00 2001 From: WenFeiYi Date: Fri, 16 Oct 2020 20:42:18 +0800 Subject: [PATCH] HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194) Signed-off-by: Reid Chan --- .../hadoop/hbase/regionserver/LogRoller.java | 108 ++++++++++++----- .../hbase/regionserver/TestLogRoller.java | 114 ++++++++++++++++++ 2 files changed, 194 insertions(+), 28 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index fd208c25b06..08d5a335e32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; @@ -56,23 +57,27 @@ public class LogRoller extends HasThread { private static final Log LOG = LogFactory.getLog(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = - new ConcurrentHashMap(); + private final ConcurrentHashMap wals = + new ConcurrentHashMap(); private final Server server; protected final RegionServerServices services; - private volatile long lastrolltime = System.currentTimeMillis(); // Period to roll log. - private final long rollperiod; + private final long rollPeriod; private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline - private long checkLowReplicationInterval; + private final long checkLowReplicationInterval; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + if (null == wals.putIfAbsent(wal, new RollController(wal))) { wal.registerWALActionsListener(new WALActionsListener.Base() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { - walNeedsRoll.put(wal, Boolean.TRUE); + RollController controller = wals.get(wal); + if (controller == null) { + wals.putIfAbsent(wal, new RollController(wal)); + controller = wals.get(wal); + } + controller.requestRoll(); // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { rollLog.set(true); @@ -84,8 +89,8 @@ public class LogRoller extends HasThread { } public void requestRollAll() { - for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); + for (RollController controller : wals.values()) { + controller.requestRoll(); } synchronized(rollLog) { rollLog.set(true); @@ -98,7 +103,7 @@ public class LogRoller extends HasThread { super("LogRoller"); this.server = server; this.services = services; - this.rollperiod = this.server.getConfiguration(). + this.rollPeriod = this.server.getConfiguration(). getLong("hbase.regionserver.logroll.period", 3600000); this.threadWakeFrequency = this.server.getConfiguration(). getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); @@ -120,9 +125,9 @@ public class LogRoller extends HasThread { */ void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : wals.entrySet()) { WAL wal = entry.getKey(); - boolean neeRollAlready = entry.getValue(); + boolean neeRollAlready = entry.getValue().needsRoll(now); if(wal instanceof FSHLog && !neeRollAlready) { FSHLog hlog = (FSHLog)wal; if ((now - hlog.getLastTimeCheckLowReplication()) @@ -139,11 +144,16 @@ public class LogRoller extends HasThread { @Override public void run() { while (!server.isStopped()) { - long now = System.currentTimeMillis(); + long now = EnvironmentEdgeManager.currentTime(); checkLowReplication(now); - boolean periodic = false; if (!rollLog.get()) { - periodic = (now - this.lastrolltime) > this.rollperiod; + boolean periodic = false; + for (RollController controller : wals.values()) { + if (controller.needsPeriodicRoll(now)) { + periodic = true; + break; + } + } if (!periodic) { synchronized (rollLog) { try { @@ -156,23 +166,24 @@ public class LogRoller extends HasThread { } continue; } - // Time for periodic roll - if (LOG.isDebugEnabled()) { - LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed"); - } - } else if (LOG.isDebugEnabled()) { - LOG.debug("WAL roll requested"); } rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH try { - this.lastrolltime = now; - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : wals.entrySet()) { final WAL wal = entry.getKey(); + RollController controller = entry.getValue(); + if (controller.isRollRequested()) { + // WAL roll requested, fall through + LOG.debug("WAL " + wal + " roll requested"); + } else if (controller.needsPeriodicRoll(now)) { + // Time for periodic roll, fall through + LOG.debug("WAL " + wal + " roll period " + this.rollPeriod + "ms elapsed"); + } else { + continue; + } // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an array of actual region names. - final byte [][] regionsToFlush = wal.rollWriter(periodic || - entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte [][] regionsToFlush = controller.rollWal(now); if (regionsToFlush != null) { for (byte [] r: regionsToFlush) scheduleFlush(r); } @@ -229,11 +240,52 @@ public class LogRoller extends HasThread { */ @VisibleForTesting public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { - if (needRoll) { + long now = EnvironmentEdgeManager.currentTime(); + for (RollController controller : wals.values()) { + if (controller.needsRoll(now)) { return false; } } return true; } + + + /** + * Independently control the roll of each wal. When use multiwal, + * can avoid all wal roll together. see HBASE-24665 for detail + */ + protected class RollController { + private final WAL wal; + private final AtomicBoolean rollRequest; + private long lastRollTime; + + RollController(WAL wal) { + this.wal = wal; + this.rollRequest = new AtomicBoolean(false); + this.lastRollTime = EnvironmentEdgeManager.currentTime(); + } + + public void requestRoll() { + this.rollRequest.set(true); + } + + public byte[][] rollWal(long now) throws IOException { + this.lastRollTime = now; + byte[][] regionsToFlush = wal.rollWriter(true); + this.rollRequest.set(false); + return regionsToFlush; + } + + public boolean isRollRequested() { + return rollRequest.get(); + } + + public boolean needsPeriodicRoll(long now) { + return (now - this.lastRollTime) > rollPeriod; + } + + public boolean needsRoll(long now) { + return isRollRequested() || needsPeriodicRoll(now); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java new file mode 100644 index 00000000000..fac6b9936e7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestLogRoller { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final int LOG_ROLL_PERIOD = 20 * 1000; + private static final String LOG_DIR = "WALs"; + private static final String ARCHIVE_DIR = "archiveWALs"; + private static final String WAL_PREFIX = "test-log-roller"; + private static Configuration CONF; + private static LogRoller ROLLER; + private static Path ROOT_DIR; + private static FileSystem FS; + + @Before + public void setup() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); + CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300); + ROOT_DIR = TEST_UTIL.getRandomDir(); + FS = FileSystem.get(CONF); + HRegionServer server = Mockito.mock(HRegionServer.class); + Mockito.when(server.getConfiguration()).thenReturn(CONF); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + ROLLER = new LogRoller(server, services); + ROLLER.start(); + } + + @After + public void tearDown() throws Exception { + ROLLER.interrupt(); + FS.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * verify that each wal roll separately + */ + @Test + public void testRequestRollWithMultiWal() throws Exception { + // add multiple wal + Map wals = new HashMap(); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, "." + i); + wal.rollWriter(true); + wals.put(wal, wal.getCurrentFileName()); + ROLLER.addWAL(wal); + Thread.sleep(1000); + } + + // request roll + Iterator> it = wals.entrySet().iterator(); + Map.Entry walEntry = it.next(); + walEntry.getKey().requestLogRoll(); + Thread.sleep(5000); + + assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + walEntry.setValue(walEntry.getKey().getCurrentFileName()); + while (it.hasNext()) { + walEntry = it.next(); + assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + } + + // period roll + Thread.sleep(LOG_ROLL_PERIOD + 5000); + for (Map.Entry entry : wals.entrySet()) { + assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName()); + entry.getKey().close(); + } + } +}