HBASE-24665 all wal of RegionGroupingProvider together roll (#2021)
Co-authored-by: wen_yi <liu.wenwen@immomo.com> Signed-off-by: Anoop <anoopsamjohn@apache.org> Signed-off-by: Ramkrishna <ramkrishna@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
09e7ccd42d
commit
975cdf7b88
|
@ -65,7 +65,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<WAL, Boolean> getWalNeedsRoll() {
|
||||
return this.walNeedsRoll;
|
||||
Map<WAL, RollController> getWalNeedsRoll() {
|
||||
return this.wals;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.wal;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -58,31 +58,31 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
|
||||
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
|
||||
|
||||
protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
|
||||
protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
|
||||
protected final T abortable;
|
||||
private volatile long lastRollTime = System.currentTimeMillis();
|
||||
// Period to roll log.
|
||||
private final long rollPeriod;
|
||||
private final int threadWakeFrequency;
|
||||
// The interval to check low replication on hlog's pipeline
|
||||
private long checkLowReplicationInterval;
|
||||
private final long checkLowReplicationInterval;
|
||||
|
||||
private volatile boolean running = true;
|
||||
|
||||
public void addWAL(WAL wal) {
|
||||
// check without lock first
|
||||
if (walNeedsRoll.containsKey(wal)) {
|
||||
if (wals.containsKey(wal)) {
|
||||
return;
|
||||
}
|
||||
// this is to avoid race between addWAL and requestRollAll.
|
||||
synchronized (this) {
|
||||
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
|
||||
if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
|
||||
wal.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
|
||||
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
|
||||
synchronized (AbstractWALRoller.this) {
|
||||
walNeedsRoll.put(wal, Boolean.TRUE);
|
||||
RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
|
||||
controller.requestRoll();
|
||||
AbstractWALRoller.this.notifyAll();
|
||||
}
|
||||
}
|
||||
|
@ -93,9 +93,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
|
||||
public void requestRollAll() {
|
||||
synchronized (this) {
|
||||
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
|
||||
for (WAL wal : wals) {
|
||||
walNeedsRoll.put(wal, Boolean.TRUE);
|
||||
for (RollController controller : wals.values()) {
|
||||
controller.requestRoll();
|
||||
}
|
||||
notifyAll();
|
||||
}
|
||||
|
@ -115,9 +114,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
*/
|
||||
private void checkLowReplication(long now) {
|
||||
try {
|
||||
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
|
||||
for (Entry<WAL, RollController> entry : wals.entrySet()) {
|
||||
WAL wal = entry.getKey();
|
||||
boolean needRollAlready = entry.getValue();
|
||||
boolean needRollAlready = entry.getValue().needsRoll(now);
|
||||
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
|
||||
continue;
|
||||
}
|
||||
|
@ -133,7 +132,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
|
||||
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
|
||||
// is already broken.
|
||||
for (WAL wal : walNeedsRoll.keySet()) {
|
||||
for (WAL wal : wals.keySet()) {
|
||||
// shutdown rather than close here since we are going to abort the RS and the wals need to be
|
||||
// split when recovery
|
||||
try {
|
||||
|
@ -148,53 +147,49 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
@Override
|
||||
public void run() {
|
||||
while (running) {
|
||||
boolean periodic = false;
|
||||
long now = System.currentTimeMillis();
|
||||
checkLowReplication(now);
|
||||
periodic = (now - this.lastRollTime) > this.rollPeriod;
|
||||
if (periodic) {
|
||||
// Time for periodic roll, fall through
|
||||
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
|
||||
} else {
|
||||
synchronized (this) {
|
||||
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
|
||||
// WAL roll requested, fall through
|
||||
LOG.debug("WAL roll requested");
|
||||
} else {
|
||||
try {
|
||||
wait(this.threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
// restore the interrupt state
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// goto the beginning to check whether again whether we should fall through to roll
|
||||
// several WALs, and also check whether we should quit.
|
||||
continue;
|
||||
synchronized (this) {
|
||||
if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
|
||||
try {
|
||||
wait(this.threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
// restore the interrupt state
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// goto the beginning to check whether again whether we should fall through to roll
|
||||
// several WALs, and also check whether we should quit.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
try {
|
||||
this.lastRollTime = System.currentTimeMillis();
|
||||
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
|
||||
.hasNext();) {
|
||||
Entry<WAL, Boolean> entry = iter.next();
|
||||
for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
|
||||
iter.hasNext();) {
|
||||
Entry<WAL, RollController> entry = iter.next();
|
||||
WAL wal = entry.getKey();
|
||||
// reset the flag in front to avoid missing roll request before we return from rollWriter.
|
||||
walNeedsRoll.put(wal, Boolean.FALSE);
|
||||
Map<byte[], List<byte[]>> regionsToFlush = null;
|
||||
RollController controller = entry.getValue();
|
||||
if (controller.isRollRequested()) {
|
||||
// WAL roll requested, fall through
|
||||
LOG.debug("WAL {} roll requested", wal);
|
||||
} else if (controller.needsPeriodicRoll(now)){
|
||||
// Time for periodic roll, fall through
|
||||
LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
||||
// The returned value is an collection of actual region and family names.
|
||||
regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
|
||||
Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
|
||||
if (regionsToFlush != null) {
|
||||
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
|
||||
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
|
||||
}
|
||||
}
|
||||
} catch (WALClosedException e) {
|
||||
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
|
||||
iter.remove();
|
||||
}
|
||||
if (regionsToFlush != null) {
|
||||
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
|
||||
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
|
||||
}
|
||||
}
|
||||
afterRoll(wal);
|
||||
}
|
||||
} catch (FailedLogCloseException | ConnectException e) {
|
||||
|
@ -232,7 +227,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
* @return true if all WAL roll finished
|
||||
*/
|
||||
public boolean walRollFinished() {
|
||||
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
|
||||
// TODO add a status field of roll in RollController
|
||||
return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
|
||||
&& isWaiting();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -249,4 +246,43 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
running = false;
|
||||
interrupt();
|
||||
}
|
||||
|
||||
/**
|
||||
* Independently control the roll of each wal. When use multiwal,
|
||||
* can avoid all wal roll together. see HBASE-24665 for detail
|
||||
*/
|
||||
protected class RollController {
|
||||
private final WAL wal;
|
||||
private final AtomicBoolean rollRequest;
|
||||
private long lastRollTime;
|
||||
|
||||
RollController(WAL wal) {
|
||||
this.wal = wal;
|
||||
this.rollRequest = new AtomicBoolean(false);
|
||||
this.lastRollTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void requestRoll() {
|
||||
this.rollRequest.set(true);
|
||||
}
|
||||
|
||||
public Map<byte[], List<byte[]>> rollWal(long now) throws IOException {
|
||||
this.lastRollTime = now;
|
||||
// reset the flag in front to avoid missing roll request before we return from rollWriter.
|
||||
this.rollRequest.set(false);
|
||||
return wal.rollWriter(true);
|
||||
}
|
||||
|
||||
public boolean isRollRequested() {
|
||||
return rollRequest.get();
|
||||
}
|
||||
|
||||
public boolean needsPeriodicRoll(long now) {
|
||||
return (now - this.lastRollTime) > rollPeriod;
|
||||
}
|
||||
|
||||
public boolean needsRoll(long now) {
|
||||
return isRollRequested() || needsPeriodicRoll(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,22 +19,28 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
public class TestLogRoller {
|
||||
|
@ -43,53 +49,104 @@ public class TestLogRoller {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLogRoller.class);
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final int logRollPeriod = 20 * 1000;
|
||||
private static final int LOG_ROLL_PERIOD = 20 * 1000;
|
||||
private static final String LOG_DIR = "WALs";
|
||||
private static final String ARCHIVE_DIR = "archiveWALs";
|
||||
private static final String WAL_PREFIX = "test-log-roller";
|
||||
private static Configuration CONF;
|
||||
private static LogRoller ROLLER;
|
||||
private static Path ROOT_DIR;
|
||||
private static FileSystem FS;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period", logRollPeriod);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TableName name = TableName.valueOf("Test");
|
||||
TEST_UTIL.createTable(name, Bytes.toBytes("cf"));
|
||||
TEST_UTIL.waitTableAvailable(name);
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
|
||||
CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
|
||||
ROOT_DIR = TEST_UTIL.getRandomDir();
|
||||
FS = FileSystem.get(CONF);
|
||||
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
||||
Mockito.when(services.getConfiguration()).thenReturn(CONF);
|
||||
ROLLER = new LogRoller(services);
|
||||
ROLLER.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
ROLLER.close();
|
||||
FS.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveClosedWAL() throws Exception {
|
||||
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||
Configuration conf = rs.getConfiguration();
|
||||
LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
|
||||
int originalSize = logRoller.getWalNeedsRoll().size();
|
||||
FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||
logRoller.addWAL(wal1);
|
||||
FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||
logRoller.addWAL(wal2);
|
||||
FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||
logRoller.addWAL(wal3);
|
||||
assertEquals(0, ROLLER.getWalNeedsRoll().size());
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
|
||||
true, WAL_PREFIX, getWALSuffix(i));
|
||||
ROLLER.addWAL(wal);
|
||||
}
|
||||
|
||||
assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size());
|
||||
assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1));
|
||||
assertEquals(3, ROLLER.getWalNeedsRoll().size());
|
||||
Iterator<WAL> it = ROLLER.getWalNeedsRoll().keySet().iterator();
|
||||
WAL wal = it.next();
|
||||
assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal));
|
||||
|
||||
wal1.close();
|
||||
Thread.sleep(2 * logRollPeriod);
|
||||
wal.close();
|
||||
Thread.sleep(LOG_ROLL_PERIOD + 5000);
|
||||
|
||||
assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size());
|
||||
assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1));
|
||||
assertEquals(2, ROLLER.getWalNeedsRoll().size());
|
||||
assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal));
|
||||
|
||||
wal2.close();
|
||||
wal3.close();
|
||||
Thread.sleep(2 * logRollPeriod);
|
||||
wal = it.next();
|
||||
wal.close();
|
||||
wal = it.next();
|
||||
wal.close();
|
||||
Thread.sleep(LOG_ROLL_PERIOD + 5000);
|
||||
|
||||
assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
|
||||
assertEquals(0, ROLLER.getWalNeedsRoll().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* verify that each wal roll separately
|
||||
*/
|
||||
@Test
|
||||
public void testRequestRollWithMultiWal() throws Exception {
|
||||
// add multiple wal
|
||||
Map<FSHLog, Path> wals = new HashMap<>();
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
|
||||
true, WAL_PREFIX, getWALSuffix(i));
|
||||
wal.init();
|
||||
wals.put(wal, wal.getCurrentFileName());
|
||||
ROLLER.addWAL(wal);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// request roll
|
||||
Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
|
||||
Map.Entry<FSHLog, Path> walEntry = it.next();
|
||||
walEntry.getKey().requestLogRoll();
|
||||
Thread.sleep(5000);
|
||||
|
||||
assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
|
||||
walEntry.setValue(walEntry.getKey().getCurrentFileName());
|
||||
while (it.hasNext()) {
|
||||
walEntry = it.next();
|
||||
assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
|
||||
}
|
||||
|
||||
// period roll
|
||||
Thread.sleep(LOG_ROLL_PERIOD + 5000);
|
||||
for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
|
||||
assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
|
||||
entry.getKey().close();
|
||||
}
|
||||
}
|
||||
|
||||
private static String getWALSuffix(int id) {
|
||||
return "." + id;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue