HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)

Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
WenFeiYi 2020-10-16 20:42:18 +08:00 committed by GitHub
parent fb25a7d530
commit e06695112a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 194 additions and 28 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
@ -56,23 +57,27 @@ public class LogRoller extends HasThread {
private static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
new ConcurrentHashMap<WAL, Boolean>();
private final ConcurrentHashMap<WAL, RollController> wals =
new ConcurrentHashMap<WAL, RollController>();
private final Server server;
protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
// Period to roll log.
private final long rollperiod;
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private final long checkLowReplicationInterval;
public void addWAL(final WAL wal) {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
if (null == wals.putIfAbsent(wal, new RollController(wal))) {
wal.registerWALActionsListener(new WALActionsListener.Base() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
walNeedsRoll.put(wal, Boolean.TRUE);
RollController controller = wals.get(wal);
if (controller == null) {
wals.putIfAbsent(wal, new RollController(wal));
controller = wals.get(wal);
}
controller.requestRoll();
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) {
rollLog.set(true);
@ -84,8 +89,8 @@ public class LogRoller extends HasThread {
}
public void requestRollAll() {
for (WAL wal : walNeedsRoll.keySet()) {
walNeedsRoll.put(wal, Boolean.TRUE);
for (RollController controller : wals.values()) {
controller.requestRoll();
}
synchronized(rollLog) {
rollLog.set(true);
@ -98,7 +103,7 @@ public class LogRoller extends HasThread {
super("LogRoller");
this.server = server;
this.services = services;
this.rollperiod = this.server.getConfiguration().
this.rollPeriod = this.server.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@ -120,9 +125,9 @@ public class LogRoller extends HasThread {
*/
void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, RollController> entry : wals.entrySet()) {
WAL wal = entry.getKey();
boolean neeRollAlready = entry.getValue();
boolean neeRollAlready = entry.getValue().needsRoll(now);
if(wal instanceof FSHLog && !neeRollAlready) {
FSHLog hlog = (FSHLog)wal;
if ((now - hlog.getLastTimeCheckLowReplication())
@ -139,11 +144,16 @@ public class LogRoller extends HasThread {
@Override
public void run() {
while (!server.isStopped()) {
long now = System.currentTimeMillis();
long now = EnvironmentEdgeManager.currentTime();
checkLowReplication(now);
boolean periodic = false;
if (!rollLog.get()) {
periodic = (now - this.lastrolltime) > this.rollperiod;
boolean periodic = false;
for (RollController controller : wals.values()) {
if (controller.needsPeriodicRoll(now)) {
periodic = true;
break;
}
}
if (!periodic) {
synchronized (rollLog) {
try {
@ -156,23 +166,24 @@ public class LogRoller extends HasThread {
}
continue;
}
// Time for periodic roll
if (LOG.isDebugEnabled()) {
LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("WAL roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
this.lastrolltime = now;
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, RollController> entry : wals.entrySet()) {
final WAL wal = entry.getKey();
RollController controller = entry.getValue();
if (controller.isRollRequested()) {
// WAL roll requested, fall through
LOG.debug("WAL " + wal + " roll requested");
} else if (controller.needsPeriodicRoll(now)) {
// Time for periodic roll, fall through
LOG.debug("WAL " + wal + " roll period " + this.rollPeriod + "ms elapsed");
} else {
continue;
}
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
final byte [][] regionsToFlush = wal.rollWriter(periodic ||
entry.getValue().booleanValue());
walNeedsRoll.put(wal, Boolean.FALSE);
final byte [][] regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
@ -229,11 +240,52 @@ public class LogRoller extends HasThread {
*/
@VisibleForTesting
public boolean walRollFinished() {
for (boolean needRoll : walNeedsRoll.values()) {
if (needRoll) {
long now = EnvironmentEdgeManager.currentTime();
for (RollController controller : wals.values()) {
if (controller.needsRoll(now)) {
return false;
}
}
return true;
}
/**
* Independently control the roll of each wal. When use multiwal,
* can avoid all wal roll together. see HBASE-24665 for detail
*/
protected class RollController {
private final WAL wal;
private final AtomicBoolean rollRequest;
private long lastRollTime;
RollController(WAL wal) {
this.wal = wal;
this.rollRequest = new AtomicBoolean(false);
this.lastRollTime = EnvironmentEdgeManager.currentTime();
}
public void requestRoll() {
this.rollRequest.set(true);
}
public byte[][] rollWal(long now) throws IOException {
this.lastRollTime = now;
byte[][] regionsToFlush = wal.rollWriter(true);
this.rollRequest.set(false);
return regionsToFlush;
}
public boolean isRollRequested() {
return rollRequest.get();
}
public boolean needsPeriodicRoll(long now) {
return (now - this.lastRollTime) > rollPeriod;
}
public boolean needsRoll(long now) {
return isRollRequested() || needsPeriodicRoll(now);
}
}
}

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@Category({RegionServerTests.class, MediumTests.class})
public class TestLogRoller {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int LOG_ROLL_PERIOD = 20 * 1000;
private static final String LOG_DIR = "WALs";
private static final String ARCHIVE_DIR = "archiveWALs";
private static final String WAL_PREFIX = "test-log-roller";
private static Configuration CONF;
private static LogRoller ROLLER;
private static Path ROOT_DIR;
private static FileSystem FS;
@Before
public void setup() throws Exception {
CONF = TEST_UTIL.getConfiguration();
CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
ROOT_DIR = TEST_UTIL.getRandomDir();
FS = FileSystem.get(CONF);
HRegionServer server = Mockito.mock(HRegionServer.class);
Mockito.when(server.getConfiguration()).thenReturn(CONF);
RegionServerServices services = Mockito.mock(RegionServerServices.class);
ROLLER = new LogRoller(server, services);
ROLLER.start();
}
@After
public void tearDown() throws Exception {
ROLLER.interrupt();
FS.close();
TEST_UTIL.shutdownMiniCluster();
}
/**
* verify that each wal roll separately
*/
@Test
public void testRequestRollWithMultiWal() throws Exception {
// add multiple wal
Map<FSHLog, Path> wals = new HashMap<FSHLog, Path>();
for (int i = 1; i <= 3; i++) {
FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
true, WAL_PREFIX, "." + i);
wal.rollWriter(true);
wals.put(wal, wal.getCurrentFileName());
ROLLER.addWAL(wal);
Thread.sleep(1000);
}
// request roll
Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
Map.Entry<FSHLog, Path> walEntry = it.next();
walEntry.getKey().requestLogRoll();
Thread.sleep(5000);
assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
walEntry.setValue(walEntry.getKey().getCurrentFileName());
while (it.hasNext()) {
walEntry = it.next();
assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
}
// period roll
Thread.sleep(LOG_ROLL_PERIOD + 5000);
for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
entry.getKey().close();
}
}
}