HBASE-21751 WAL creation fails during region open may cause region assign forever fail

Signed-off-by: Allan Yang <allan163@apache.org>
    Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Bing Xiao 2019-07-01 21:54:00 -07:00 committed by stack
parent 668f543cdc
commit a2a929f488
19 changed files with 322 additions and 23 deletions

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Throw when failed cleanup unsuccessful initialized wal
*/
@InterfaceAudience.Public
public class FailedCloseWALAfterInitializedErrorException
extends IOException {
private static final long serialVersionUID = -5463156587431677322L;
/**
* constructor with error msg and throwable
* @param msg message
* @param t throwable
*/
public FailedCloseWALAfterInitializedErrorException(String msg, Throwable t) {
super(msg, t);
}
/**
* constructor with error msg
* @param msg message
*/
public FailedCloseWALAfterInitializedErrorException(String msg) {
super(msg);
}
/**
* default constructor
*/
public FailedCloseWALAfterInitializedErrorException() {
super();
}
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@ -2130,11 +2131,17 @@ public class HRegionServer extends HasThread implements
@Override
public WAL getWAL(RegionInfo regionInfo) throws IOException {
WAL wal = walFactory.getWAL(regionInfo);
if (this.walRoller != null) {
this.walRoller.addWAL(wal);
try {
WAL wal = walFactory.getWAL(regionInfo);
if (this.walRoller != null) {
this.walRoller.addWAL(wal);
}
return wal;
}catch (FailedCloseWALAfterInitializedErrorException ex) {
// see HBASE-21751 for details
abort("wal can not clean up after init failed", ex);
throw ex;
}
return wal;
}
public LogRoller getWalRoller() {

View File

@ -427,6 +427,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.implClassName = getClass().getSimpleName();
}
/**
* Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
*/
public void init() throws IOException {
rollWriter();
}
@Override
public void registerWALActionsListener(WALActionsListener listener) {
this.listeners.add(listener);

View File

@ -250,7 +250,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
rollWriter();
}
private static boolean waitingRoll(int epochAndState) {

View File

@ -217,9 +217,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
// rollWriter sets this.hdfs_out if it can.
rollWriter();
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -150,6 +151,20 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return walCopy;
}
walCopy = createWAL();
boolean succ = false;
try {
walCopy.init();
succ = true;
} finally {
if (!succ) {
try {
walCopy.close();
} catch (Throwable t) {
throw new FailedCloseWALAfterInitializedErrorException(
"Failed close after init wal failed.", t);
}
}
}
wal = walCopy;
return walCopy;
} finally {

View File

@ -471,6 +471,11 @@ public class WALFactory {
return FSHLogProvider.createWriter(configuration, fs, path, false);
}
@VisibleForTesting
public String getFactoryId() {
return factoryId;
}
public final WALProvider getWALProvider() {
return this.provider;
}

View File

@ -172,6 +172,7 @@ public class TestFailedAppendAndSync {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL);
logRoller.start();

View File

@ -402,6 +402,7 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + testName);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
hLog.init();
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES);
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
@ -1207,6 +1208,7 @@ public class TestHRegion {
FailAppendFlushMarkerWAL wal =
new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
try {
@ -1238,7 +1240,7 @@ public class TestHRegion {
wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
region.put(put);
@ -2490,6 +2492,7 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
hLog.init();
// This chunk creation is done throughout the code base. Do we want to move it into core?
// It is missing from this test. W/o it we NPE.
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -81,8 +80,9 @@ public class TestRegionIncrement {
}
private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
TEST_UTIL.getDataTestDir().toString(), conf);
wal.init();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,

View File

@ -215,6 +215,7 @@ public class TestWALLockup {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
Path originalWAL = dodgyWAL.getCurrentFileName();
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);
@ -390,6 +391,7 @@ public class TestWALLockup {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
final DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL);

View File

@ -436,6 +436,7 @@ public abstract class AbstractTestFSWAL {
String testName = currentTest.getMethodName();
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
CONF, null, true, null, null);
wal.init();
try {
wal.sync();
} finally {

View File

@ -1098,6 +1098,7 @@ public abstract class AbstractTestWALReplay {
private MockWAL createMockWAL() throws IOException {
MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
wal.init();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);

View File

@ -67,8 +67,10 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP, CHANNEL_CLASS);
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf,
listeners, failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS);
asyncFSWAL.init();
return asyncFSWAL;
}
@Override
@ -76,9 +78,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP, CHANNEL_CLASS) {
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
action.run();
@ -86,5 +87,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
}
};
asyncFSWAL.init();
return asyncFSWAL;
}
}

View File

@ -66,7 +66,9 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS);
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS);
asyncFSWAL.init();
return asyncFSWAL;
}
}

View File

@ -74,8 +74,10 @@ public class TestFSHLog extends AbstractTestFSWAL {
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix);
FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir,
conf, listeners, failIfWALExists, prefix, suffix);
fshLog.init();
return fshLog;
}
@Override
@ -83,8 +85,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix) {
FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir,
conf, listeners, failIfWALExists, prefix, suffix) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
@ -92,6 +94,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
super.atHeadOfRingBufferEventHandlerAppend();
}
};
fshLog.init();
return fshLog;
}
@Test
@ -100,6 +104,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
final String name = this.name.getMethodName();
FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
CONF, null, true, null, null);
log.init();
try {
Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
ringBufferEventHandlerField.setAccessible(true);
@ -142,7 +147,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
try (FSHLog log =
new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF,
null, true, null, null)) {
log.init();
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)

View File

@ -104,6 +104,7 @@ public class TestWALDurability {
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
HRegion region = initHRegion(tableName, null, null, customFSLog);
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
@ -118,6 +119,7 @@ public class TestWALDurability {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
region = initHRegion(tableName, null, null, customFSLog);
customFSLog.resetSyncFlag();

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test WAL Init ERROR
*/
@Category({RegionServerTests.class, MediumTests.class})
public class TestWALOpenError {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALOpenError.class);
private static final Logger LOG = LoggerFactory.getLogger(TestWALOpenError.class);
protected static Configuration conf;
private static MiniDFSCluster cluster;
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static Path hbaseDir;
protected static Path hbaseWALDir;
protected FileSystem fs;
protected Path dir;
protected WALFactory wals;
private ServerName currentServername;
@Rule
public final TestName currentTest = new TestName();
@Before
public void setUp() throws Exception {
fs = cluster.getFileSystem();
dir = new Path(hbaseDir, currentTest.getMethodName());
this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
wals = new WALFactory(conf, this.currentServername.toString());
}
@After
public void tearDown() throws Exception {
// testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
try {
wals.close();
} catch (IOException exception) {
LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
" may be the cause. Message: " + exception);
LOG.debug("Exception details for failure to close wal factory.", exception);
}
FileStatus[] entries = fs.listStatus(new Path("/"));
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniDFSCluster(3);
conf = TEST_UTIL.getConfiguration();
conf.set(WALFactory.WAL_PROVIDER, MyFSWalProvider.class.getName());
conf.set(WALFactory.META_WAL_PROVIDER, MyFSWalProvider.class.getName());
cluster = TEST_UTIL.getDFSCluster();
hbaseDir = TEST_UTIL.createRootDir();
hbaseWALDir = TEST_UTIL.createWALRootDir();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private static MyFSLog myFSLogCreated;
private static boolean throwExceptionWhenCloseFSLogClose = false;
@Test
public void testWALClosedIfOpenError() throws IOException {
throwExceptionWhenCloseFSLogClose = false;
boolean hasFakeInitException = false;
try {
wals.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
} catch (IOException ex) {
hasFakeInitException = ex.getMessage().contains("Fake init exception");
}
Assert.assertTrue(hasFakeInitException);
Assert.assertTrue(myFSLogCreated.closed);
FileStatus[] fileStatuses = CommonFSUtils.listStatus(fs, myFSLogCreated.walDir);
Assert.assertTrue(fileStatuses == null || fileStatuses.length == 0);
}
@Test
public void testThrowFailedCloseWalException() throws IOException {
throwExceptionWhenCloseFSLogClose = true;
boolean failedCloseWalException = false;
try {
wals.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
} catch (FailedCloseWALAfterInitializedErrorException ex) {
failedCloseWalException = true;
}
Assert.assertTrue(failedCloseWalException);
}
public static class MyFSWalProvider extends FSHLogProvider {
@Override
protected MyFSLog createWAL() throws IOException {
MyFSLog myFSLog = new MyFSLog(CommonFSUtils.getWALFileSystem(conf),
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
myFSLogCreated = myFSLog;
return myFSLog;
}
}
public static class MyFSLog extends FSHLog {
public MyFSLog(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}
@Override
public void init() throws IOException {
super.init();
throw new IOException("Fake init exception");
}
@Override
public void close() throws IOException {
if (throwExceptionWhenCloseFSLogClose) {
throw new IOException("Fake close exception");
}
super.close();
}
}
}

View File

@ -48,6 +48,7 @@ public class TestWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
wal.init();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);