HBASE-16960 RegionServer hang when aborting

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
binlijin 2016-11-04 14:06:37 +08:00 committed by Yu Li
parent 51ba7cfde3
commit 5bfec397d1
4 changed files with 365 additions and 3 deletions

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -164,6 +165,8 @@ public class FSHLog implements WAL {
private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
* Appends and syncs are each put on the ring which means handlers need to
@ -281,6 +284,8 @@ public class FSHLog implements WAL {
private final int slowSyncNs;
private final long walSyncTimeout;
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
@ -535,6 +540,8 @@ public class FSHLog implements WAL {
this.slowSyncNs =
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
DEFAULT_SLOW_SYNC_TIME_MS);
this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
DEFAULT_WAL_SYNC_TIMEOUT_MS);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
@ -1400,8 +1407,14 @@ public class FSHLog implements WAL {
private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
try {
syncFuture.get();
syncFuture.get(walSyncTimeout);
return syncFuture.getSpan();
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.syncFuturesByHandler.remove(Thread.currentThread());
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
@ -1806,6 +1819,12 @@ public class FSHLog implements WAL {
} catch (Exception e) {
// Failed append. Record the exception.
this.exception = e;
// invoking cleanupOutstandingSyncsOnException when append failed with exception,
// it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet,
// so there won't be any sync future left over if no further truck published to disruptor.
cleanupOutstandingSyncsOnException(sequence,
this.exception instanceof DamagedWALException ? this.exception
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
} finally {

View File

@ -21,6 +21,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Span;
/**
@ -105,6 +107,7 @@ class SyncFuture {
this.doneSequence = NOT_DONE;
this.ringBufferSequence = sequence;
this.span = span;
this.throwable = null;
return this;
}
@ -162,9 +165,16 @@ class SyncFuture {
throw new UnsupportedOperationException();
}
public synchronized long get() throws InterruptedException, ExecutionException {
public synchronized long get(long timeout) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = EnvironmentEdgeManager.currentTime() + timeout;
while (!isDone()) {
wait(1000);
if (EnvironmentEdgeManager.currentTime() >= done) {
throw new TimeoutIOException("Failed to get sync result after "
+ timeout + " ms for ringBufferSequence=" + this.ringBufferSequence
+ ", WAL system stuck?");
}
}
if (this.throwable != null) throw new ExecutionException(this.throwable);
return this.doneSequence;

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -31,14 +30,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -47,6 +53,8 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -281,6 +289,281 @@ public class TestWALLockup {
}
}
/**
* Reproduce locking up that happens when there's no further syncs after
* append fails, and causing an isolated sync then infinite wait. See
* HBASE-16960. If below is broken, we will see this test timeout because it
* is locked up.
* <p/>
* Steps for reproduce:<br/>
* 1. Trigger server abort through dodgyWAL1<br/>
* 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer
* event handler thread sleep for a while thus keeping {@code endOfBatch}
* false<br/>
* 3. Publish a sync then an append which will throw exception, check whether
* the sync could return
*/
@Test(timeout = 20000)
public void testLockup16960() throws IOException {
// A WAL that we can have throw exceptions when a flag is set.
class DodgyFSLog extends FSHLog {
// Set this when want the WAL to start throwing exceptions.
volatile boolean throwException = false;
public DodgyFSLog(FileSystem fs, Path root, String logDir,
Configuration conf) throws IOException {
super(fs, root, logDir, conf);
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync() throws IOException {
if (throwException) {
throw new IOException(
"FAKE! Failed to replace a bad datanode...SYNC");
}
w.sync();
}
@Override
public void append(Entry entry) throws IOException {
if (throwException) {
throw new IOException(
"FAKE! Failed to replace a bad datanode...APPEND");
}
w.append(entry);
}
@Override
public long getLength() throws IOException {
return w.getLength();
}
};
}
@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException,
IOException {
if (throwException) {
throw new FailedLogCloseException("testLockup16960");
}
return super.rollWriter(force);
}
}
// Mocked up server and regionserver services. Needed below.
Server server = new DummyServer(CONF, ServerName.valueOf(
"hostname1.example.org", 1234, 1L).toString());
RegionServerServices services = Mockito.mock(RegionServerServices.class);
CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000);
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL,
// go ahead with test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF);
Path rootDir2 = new Path(dir + getName() + "2");
final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2",
CONF);
// Add a listener to force ringbuffer event handler sleep for a while
dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener());
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL1);
logRoller.addWAL(dodgyWAL2);
// There is no 'stop' once a logRoller is running.. it just dies.
logRoller.start();
// Now get a region and start adding in edits.
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
byte[] bytes = Bytes.toBytes(getName());
try {
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName());
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
edit.add(CellScanner.current());
LOG.info("SET throwing of exception on append");
dodgyWAL1.throwException = true;
// This append provokes a WAL roll request
dodgyWAL1.append(htd, region.getRegionInfo(), key, edit, true);
boolean exception = false;
try {
dodgyWAL1.sync();
} catch (Exception e) {
exception = true;
}
assertTrue("Did not get sync exception", exception);
// LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and
// cause server abort.
try {
// wait LogRoller exit.
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
final CountDownLatch latch = new CountDownLatch(1);
// make RingBufferEventHandler sleep 1s, so the following sync
// endOfBatch=false
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("sleep"));
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
Thread t = new Thread("Sync") {
public void run() {
try {
dodgyWAL2.sync();
} catch (IOException e) {
LOG.info("In sync", e);
}
latch.countDown();
LOG.info("Sync exiting");
};
};
t.setDaemon(true);
t.start();
try {
// make sure sync have published.
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// make append throw DamagedWALException
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("DamagedWALException"));
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
while (latch.getCount() > 0) {
Threads.sleep(100);
}
assertTrue(server.isAborted());
} finally {
if (logRoller != null) {
logRoller.interrupt();
}
try {
if (region != null) {
region.close();
}
if (dodgyWAL1 != null) {
dodgyWAL1.close();
}
if (dodgyWAL2 != null) {
dodgyWAL2.close();
}
} catch (Exception e) {
LOG.info("On way out", e);
}
}
}
static class DummyServer implements Server {
private Configuration conf;
private String serverName;
private boolean isAborted = false;
public DummyServer(Configuration conf, String serverName) {
this.conf = conf;
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
LOG.info("Aborting " + serverName);
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isAborted = true;
}
@Override
public boolean isStopped() {
return this.isAborted;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
static class DummyWALActionsListener extends WALActionsListener.Base {
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) throws IOException {
if (logKey.getTablename().getNameAsString().equalsIgnoreCase("sleep")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (logKey.getTablename().getNameAsString()
.equalsIgnoreCase("DamagedWALException")) {
throw new DamagedWALException("Failed appending");
}
}
}
/**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestSyncFuture {
@Test(timeout = 60000)
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture();
syncFulture.reset(txid);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid);
try {
syncFulture.get(timeout);
fail("Should have timed out but not");
} catch (TimeoutIOException e) {
// test passed
}
}
}