HBASE-16960 RegionServer hang when aborting

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
binlijin 2016-11-02 11:57:17 +08:00 committed by Yu Li
parent 4bd85f98fe
commit 6b957b69a7
5 changed files with 376 additions and 5 deletions

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
@ -55,6 +53,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.DrainBarrier;
@ -71,6 +70,8 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
* WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
@ -104,6 +105,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
/** /**
* file system instance * file system instance
*/ */
@ -162,6 +165,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
protected final int slowSyncNs; protected final int slowSyncNs;
private final long walSyncTimeout;
// If > than this size, roll the log. // If > than this size, roll the log.
protected final long logrollsize; protected final long logrollsize;
@ -381,6 +386,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
+ walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
this.slowSyncNs = this.slowSyncNs =
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS);
this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
DEFAULT_WAL_SYNC_TIMEOUT_MS);
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
// Presize our map of SyncFutures by handler objects. // Presize our map of SyncFutures by handler objects.
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount); this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
@ -659,8 +666,14 @@ public abstract class AbstractFSWAL<W> implements WAL {
protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { protected Span blockOnSync(final SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back. // Now we have published the ringbuffer, halt the current thread until we get an answer back.
try { try {
syncFuture.get(); syncFuture.get(walSyncTimeout);
return syncFuture.getSpan(); return syncFuture.getSpan();
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.syncFuturesByHandler.remove(Thread.currentThread());
throw tioe;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("Interrupted", ie); LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie); throw convertInterruptedExceptionToIOException(ie);

View File

@ -1042,6 +1042,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} catch (Exception e) { } catch (Exception e) {
// Failed append. Record the exception. // Failed append. Record the exception.
this.exception = e; this.exception = e;
// invoking cleanupOutstandingSyncsOnException when append failed with exception,
// it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet,
// so there won't be any sync future left over if no further truck published to disruptor.
cleanupOutstandingSyncsOnException(sequence,
this.exception instanceof DamagedWALException ? this.exception
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer // Return to keep processing events coming off the ringbuffer
return; return;
} finally { } finally {

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Span; import org.apache.htrace.Span;
/** /**
@ -96,6 +98,7 @@ class SyncFuture {
this.doneTxid = NOT_DONE; this.doneTxid = NOT_DONE;
this.txid = txid; this.txid = txid;
this.span = span; this.span = span;
this.throwable = null;
return this; return this;
} }
@ -154,9 +157,16 @@ class SyncFuture {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
synchronized long get() throws InterruptedException, ExecutionException { synchronized long get(long timeout) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = EnvironmentEdgeManager.currentTime() + timeout;
while (!isDone()) { while (!isDone()) {
wait(1000); wait(1000);
if (EnvironmentEdgeManager.currentTime() >= done) {
throw new TimeoutIOException("Failed to get sync result after "
+ timeout + " ms for txid=" + this.txid
+ ", WAL system stuck?");
}
} }
if (this.throwable != null) { if (this.throwable != null) {
throw new ExecutionException(this.throwable); throw new ExecutionException(this.throwable);

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -33,14 +32,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -49,6 +55,8 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -287,6 +295,291 @@ public class TestWALLockup {
} }
} }
/**
* Reproduce locking up that happens when there's no further syncs after
* append fails, and causing an isolated sync then infinite wait. See
* HBASE-16960. If below is broken, we will see this test timeout because it
* is locked up.
* <p/>
* Steps for reproduce:<br/>
* 1. Trigger server abort through dodgyWAL1<br/>
* 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer
* event handler thread sleep for a while thus keeping {@code endOfBatch}
* false<br/>
* 3. Publish a sync then an append which will throw exception, check whether
* the sync could return
*/
@Test(timeout = 20000)
public void testLockup16960() throws IOException {
// A WAL that we can have throw exceptions when a flag is set.
class DodgyFSLog extends FSHLog {
// Set this when want the WAL to start throwing exceptions.
volatile boolean throwException = false;
public DodgyFSLog(FileSystem fs, Path root, String logDir,
Configuration conf) throws IOException {
super(fs, root, logDir, conf);
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync() throws IOException {
if (throwException) {
throw new IOException(
"FAKE! Failed to replace a bad datanode...SYNC");
}
w.sync();
}
@Override
public void append(Entry entry) throws IOException {
if (throwException) {
throw new IOException(
"FAKE! Failed to replace a bad datanode...APPEND");
}
w.append(entry);
}
@Override
public long getLength() throws IOException {
return w.getLength();
}
};
}
@Override
protected long doReplaceWriter(Path oldPath, Path newPath,
Writer nextWriter) throws IOException {
if (throwException) {
throw new FailedLogCloseException("oldPath=" + oldPath + ", newPath="
+ newPath);
}
long oldFileLen = 0L;
oldFileLen = super.doReplaceWriter(oldPath, newPath, nextWriter);
return oldFileLen;
}
}
// Mocked up server and regionserver services. Needed below.
Server server = new DummyServer(CONF, ServerName.valueOf(
"hostname1.example.org", 1234, 1L).toString());
RegionServerServices services = Mockito.mock(RegionServerServices.class);
CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000);
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL,
// go ahead with test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF);
Path rootDir2 = new Path(dir + getName() + "2");
final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2",
CONF);
// Add a listener to force ringbuffer event handler sleep for a while
dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener());
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL1);
logRoller.addWAL(dodgyWAL2);
// There is no 'stop' once a logRoller is running.. it just dies.
logRoller.start();
// Now get a region and start adding in edits.
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
byte[] bytes = Bytes.toBytes(getName());
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
scopes.put(COLUMN_FAMILY_BYTES, 0);
try {
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName(), scopes);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
edit.add(CellScanner.current());
LOG.info("SET throwing of exception on append");
dodgyWAL1.throwException = true;
// This append provokes a WAL roll request
dodgyWAL1.append(region.getRegionInfo(), key, edit, true);
boolean exception = false;
try {
dodgyWAL1.sync();
} catch (Exception e) {
exception = true;
}
assertTrue("Did not get sync exception", exception);
// LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and
// cause server abort.
try {
// wait LogRoller exit.
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
final CountDownLatch latch = new CountDownLatch(1);
// make RingBufferEventHandler sleep 1s, so the following sync
// endOfBatch=false
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("sleep"), scopes);
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
Thread t = new Thread("Sync") {
public void run() {
try {
dodgyWAL2.sync();
} catch (IOException e) {
LOG.info("In sync", e);
}
latch.countDown();
LOG.info("Sync exiting");
};
};
t.setDaemon(true);
t.start();
try {
// make sure sync have published.
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// make append throw DamagedWALException
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("DamagedWALException"), scopes);
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
while (latch.getCount() > 0) {
Threads.sleep(100);
}
assertTrue(server.isAborted());
} finally {
if (logRoller != null) {
logRoller.interrupt();
}
try {
if (region != null) {
region.close();
}
if (dodgyWAL1 != null) {
dodgyWAL1.close();
}
if (dodgyWAL2 != null) {
dodgyWAL2.close();
}
} catch (Exception e) {
LOG.info("On way out", e);
}
}
}
static class DummyServer implements Server {
private Configuration conf;
private String serverName;
private boolean isAborted = false;
public DummyServer(Configuration conf, String serverName) {
this.conf = conf;
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
LOG.info("Aborting " + serverName);
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isAborted = true;
}
@Override
public boolean isStopped() {
return this.isAborted;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
}
static class DummyWALActionsListener extends WALActionsListener.Base {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
throws IOException {
if (logKey.getTablename().getNameAsString().equalsIgnoreCase("sleep")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (logKey.getTablename().getNameAsString()
.equalsIgnoreCase("DamagedWALException")) {
throw new DamagedWALException("Failed appending");
}
}
}
/** /**
* @return A region on which you must call * @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestSyncFuture {
@Test(timeout = 60000)
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture(txid, null);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid, null);
try {
syncFulture.get(timeout);
fail("Should have timed out but not");
} catch (TimeoutIOException e) {
// test passed
}
}
}