HBASE-20458 Support removing a WAL from LogRoller
This commit is contained in:
parent
2d203c4479
commit
1bea678ef8
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALClosedException;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
@ -177,17 +180,24 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
|
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
|
||||||
try {
|
try {
|
||||||
this.lastrolltime = now;
|
this.lastrolltime = now;
|
||||||
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
|
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
|
||||||
|
.hasNext();) {
|
||||||
|
Entry<WAL, Boolean> entry = iter.next();
|
||||||
final WAL wal = entry.getKey();
|
final WAL wal = entry.getKey();
|
||||||
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
||||||
// The returned value is an array of actual region names.
|
// The returned value is an array of actual region names.
|
||||||
final byte [][] regionsToFlush = wal.rollWriter(periodic ||
|
try {
|
||||||
entry.getValue().booleanValue());
|
final byte[][] regionsToFlush =
|
||||||
walNeedsRoll.put(wal, Boolean.FALSE);
|
wal.rollWriter(periodic || entry.getValue().booleanValue());
|
||||||
if (regionsToFlush != null) {
|
walNeedsRoll.put(wal, Boolean.FALSE);
|
||||||
for (byte[] r : regionsToFlush) {
|
if (regionsToFlush != null) {
|
||||||
scheduleFlush(r);
|
for (byte[] r : regionsToFlush) {
|
||||||
|
scheduleFlush(r);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} catch (WALClosedException e) {
|
||||||
|
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
|
||||||
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (FailedLogCloseException e) {
|
} catch (FailedLogCloseException e) {
|
||||||
|
@ -252,4 +262,9 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
running = false;
|
running = false;
|
||||||
interrupt();
|
interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Map<WAL, Boolean> getWalNeedsRoll() {
|
||||||
|
return this.walNeedsRoll;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -754,15 +754,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
||||||
rollWriterLock.lock();
|
rollWriterLock.lock();
|
||||||
try {
|
try {
|
||||||
|
if (this.closed) {
|
||||||
|
throw new WALClosedException("WAL has been closed");
|
||||||
|
}
|
||||||
// Return if nothing to flush.
|
// Return if nothing to flush.
|
||||||
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
byte[][] regionsToFlush = null;
|
byte[][] regionsToFlush = null;
|
||||||
if (this.closed) {
|
|
||||||
LOG.debug("WAL closed. Skipping rolling of writer");
|
|
||||||
return regionsToFlush;
|
|
||||||
}
|
|
||||||
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
|
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
|
||||||
Path oldPath = getOldPath();
|
Path oldPath = getOldPath();
|
||||||
Path newPath = getNewPath();
|
Path newPath = getNewPath();
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.regionserver.LogRoller;
|
||||||
|
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when {@link LogRoller} try to roll writer but the WAL already was closed. This may
|
||||||
|
* happened when peer's sync replication state was transited from
|
||||||
|
* {@link SyncReplicationState#ACTIVE} to {@link SyncReplicationState#DOWNGRADE_ACTIVE} and the
|
||||||
|
* region's WAL was changed to a new one. But the old WAL was still left in {@link LogRoller}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class WALClosedException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -3183198896865290678L;
|
||||||
|
|
||||||
|
public WALClosedException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param msg exception message
|
||||||
|
*/
|
||||||
|
public WALClosedException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({RegionServerTests.class, MediumTests.class})
|
||||||
|
public class TestLogRoller {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestLogRoller.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static final int logRollPeriod = 20 * 1000;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period", logRollPeriod);
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveClosedWAL() throws Exception {
|
||||||
|
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||||
|
Configuration conf = rs.getConfiguration();
|
||||||
|
LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
|
||||||
|
int originalSize = logRoller.getWalNeedsRoll().size();
|
||||||
|
FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||||
|
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||||
|
logRoller.addWAL(wal1);
|
||||||
|
FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||||
|
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||||
|
logRoller.addWAL(wal2);
|
||||||
|
FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
|
||||||
|
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf);
|
||||||
|
logRoller.addWAL(wal3);
|
||||||
|
|
||||||
|
assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size());
|
||||||
|
assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1));
|
||||||
|
|
||||||
|
wal1.close();
|
||||||
|
Thread.sleep(2 * logRollPeriod);
|
||||||
|
|
||||||
|
assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size());
|
||||||
|
assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1));
|
||||||
|
|
||||||
|
wal2.close();
|
||||||
|
wal3.close();
|
||||||
|
Thread.sleep(2 * logRollPeriod);
|
||||||
|
|
||||||
|
assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -473,4 +473,13 @@ public abstract class AbstractTestFSWAL {
|
||||||
assertNull(key.getWriteEntry());
|
assertNull(key.getWriteEntry());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = WALClosedException.class)
|
||||||
|
public void testRollWriterForClosedWAL() throws IOException {
|
||||||
|
String testName = currentTest.getMethodName();
|
||||||
|
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
|
||||||
|
CONF, null, true, null, null);
|
||||||
|
wal.close();
|
||||||
|
wal.rollWriter();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue