From 6b957b69a7e0b1a01385a2b7be42dd995784dd06 Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 2 Nov 2016 11:57:17 +0800 Subject: [PATCH] HBASE-16960 RegionServer hang when aborting Signed-off-by: Yu Li --- .../hbase/regionserver/wal/AbstractFSWAL.java | 19 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 6 + .../hbase/regionserver/wal/SyncFuture.java | 12 +- .../hbase/regionserver/TestWALLockup.java | 295 +++++++++++++++++- .../regionserver/wal/TestSyncFuture.java | 49 +++ 5 files changed, 376 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index c1e8019dc74..0ef0cf7cec3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.ManagementFactory; @@ -55,6 +53,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -71,6 +70,8 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; + /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. @@ -104,6 +105,8 @@ public abstract class AbstractFSWAL implements WAL { protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + /** * file system instance */ @@ -162,6 +165,8 @@ public abstract class AbstractFSWAL implements WAL { protected final int slowSyncNs; + private final long walSyncTimeout; + // If > than this size, roll the log. protected final long logrollsize; @@ -381,6 +386,8 @@ public abstract class AbstractFSWAL implements WAL { + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout", + DEFAULT_WAL_SYNC_TIMEOUT_MS); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); // Presize our map of SyncFutures by handler objects. this.syncFuturesByHandler = new ConcurrentHashMap(maxHandlersCount); @@ -659,8 +666,14 @@ public abstract class AbstractFSWAL implements WAL { protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { - syncFuture.get(); + syncFuture.get(walSyncTimeout); return syncFuture.getSpan(); + } catch (TimeoutIOException tioe) { + // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer + // still refer to it, so if this thread use it next time may get a wrong + // result. + this.syncFuturesByHandler.remove(Thread.currentThread()); + throw tioe; } catch (InterruptedException ie) { LOG.warn("Interrupted", ie); throw convertInterruptedExceptionToIOException(ie); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 142ab6393d7..edf698e39d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1042,6 +1042,12 @@ public class FSHLog extends AbstractFSWAL { } catch (Exception e) { // Failed append. Record the exception. this.exception = e; + // invoking cleanupOutstandingSyncsOnException when append failed with exception, + // it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet, + // so there won't be any sync future left over if no further truck published to disruptor. + cleanupOutstandingSyncsOnException(sequence, + this.exception instanceof DamagedWALException ? this.exception + : new DamagedWALException("On sync", this.exception)); // Return to keep processing events coming off the ringbuffer return; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 5ec218a9f4d..6e302a30386 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Span; /** @@ -96,6 +98,7 @@ class SyncFuture { this.doneTxid = NOT_DONE; this.txid = txid; this.span = span; + this.throwable = null; return this; } @@ -154,9 +157,16 @@ class SyncFuture { throw new UnsupportedOperationException(); } - synchronized long get() throws InterruptedException, ExecutionException { + synchronized long get(long timeout) throws InterruptedException, + ExecutionException, TimeoutIOException { + final long done = EnvironmentEdgeManager.currentTime() + timeout; while (!isDone()) { wait(1000); + if (EnvironmentEdgeManager.currentTime() >= done) { + throw new TimeoutIOException("Failed to get sync result after " + + timeout + " ms for txid=" + this.txid + + ", WAL system stuck?"); + } } if (this.throwable != null) { throw new ExecutionException(this.throwable); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index e9bb4681619..63fbb691f7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; - import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -33,14 +32,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -49,6 +55,8 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -287,6 +295,291 @@ public class TestWALLockup { } } + /** + * Reproduce locking up that happens when there's no further syncs after + * append fails, and causing an isolated sync then infinite wait. See + * HBASE-16960. If below is broken, we will see this test timeout because it + * is locked up. + *

+ * Steps for reproduce:
+ * 1. Trigger server abort through dodgyWAL1
+ * 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer + * event handler thread sleep for a while thus keeping {@code endOfBatch} + * false
+ * 3. Publish a sync then an append which will throw exception, check whether + * the sync could return + */ + @Test(timeout = 20000) + public void testLockup16960() throws IOException { + // A WAL that we can have throw exceptions when a flag is set. + class DodgyFSLog extends FSHLog { + // Set this when want the WAL to start throwing exceptions. + volatile boolean throwException = false; + + public DodgyFSLog(FileSystem fs, Path root, String logDir, + Configuration conf) throws IOException { + super(fs, root, logDir, conf); + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwException) { + throw new IOException( + "FAKE! Failed to replace a bad datanode...SYNC"); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwException) { + throw new IOException( + "FAKE! Failed to replace a bad datanode...APPEND"); + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + + @Override + protected long doReplaceWriter(Path oldPath, Path newPath, + Writer nextWriter) throws IOException { + if (throwException) { + throw new FailedLogCloseException("oldPath=" + oldPath + ", newPath=" + + newPath); + } + long oldFileLen = 0L; + oldFileLen = super.doReplaceWriter(oldPath, newPath, nextWriter); + return oldFileLen; + } + } + + // Mocked up server and regionserver services. Needed below. + Server server = new DummyServer(CONF, ServerName.valueOf( + "hostname1.example.org", 1234, 1L).toString()); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + + CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000); + + // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, + // go ahead with test. + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + getName()); + DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF); + + Path rootDir2 = new Path(dir + getName() + "2"); + final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2", + CONF); + // Add a listener to force ringbuffer event handler sleep for a while + dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener()); + + // I need a log roller running. + LogRoller logRoller = new LogRoller(server, services); + logRoller.addWAL(dodgyWAL1); + logRoller.addWAL(dodgyWAL2); + // There is no 'stop' once a logRoller is running.. it just dies. + logRoller.start(); + // Now get a region and start adding in edits. + HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); + final HRegion region = initHRegion(tableName, null, null, dodgyWAL1); + byte[] bytes = Bytes.toBytes(getName()); + NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + scopes.put(COLUMN_FAMILY_BYTES, 0); + try { + Put put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), + htd.getTableName(), scopes); + WALEdit edit = new WALEdit(); + CellScanner CellScanner = put.cellScanner(); + assertTrue(CellScanner.advance()); + edit.add(CellScanner.current()); + + LOG.info("SET throwing of exception on append"); + dodgyWAL1.throwException = true; + // This append provokes a WAL roll request + dodgyWAL1.append(region.getRegionInfo(), key, edit, true); + boolean exception = false; + try { + dodgyWAL1.sync(); + } catch (Exception e) { + exception = true; + } + assertTrue("Did not get sync exception", exception); + + // LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and + // cause server abort. + try { + // wait LogRoller exit. + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + final CountDownLatch latch = new CountDownLatch(1); + + // make RingBufferEventHandler sleep 1s, so the following sync + // endOfBatch=false + key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), + TableName.valueOf("sleep"), scopes); + dodgyWAL2.append(region.getRegionInfo(), key, edit, true); + + Thread t = new Thread("Sync") { + public void run() { + try { + dodgyWAL2.sync(); + } catch (IOException e) { + LOG.info("In sync", e); + } + latch.countDown(); + LOG.info("Sync exiting"); + }; + }; + t.setDaemon(true); + t.start(); + try { + // make sure sync have published. + Thread.sleep(100); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + // make append throw DamagedWALException + key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), + TableName.valueOf("DamagedWALException"), scopes); + dodgyWAL2.append(region.getRegionInfo(), key, edit, true); + + while (latch.getCount() > 0) { + Threads.sleep(100); + } + assertTrue(server.isAborted()); + } finally { + if (logRoller != null) { + logRoller.interrupt(); + } + try { + if (region != null) { + region.close(); + } + if (dodgyWAL1 != null) { + dodgyWAL1.close(); + } + if (dodgyWAL2 != null) { + dodgyWAL2.close(); + } + } catch (Exception e) { + LOG.info("On way out", e); + } + } + } + + static class DummyServer implements Server { + private Configuration conf; + private String serverName; + private boolean isAborted = false; + + public DummyServer(Configuration conf, String serverName) { + this.conf = conf; + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + LOG.info("Aborting " + serverName); + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isAborted = true; + } + + @Override + public boolean isStopped() { + return this.isAborted; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + @Override + public ClusterConnection getClusterConnection() { + return null; + } + + } + + static class DummyWALActionsListener extends WALActionsListener.Base { + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) + throws IOException { + if (logKey.getTablename().getNameAsString().equalsIgnoreCase("sleep")) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (logKey.getTablename().getNameAsString() + .equalsIgnoreCase("DamagedWALException")) { + throw new DamagedWALException("Failed appending"); + } + } + + } + /** * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java new file mode 100644 index 00000000000..2cba040403b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestSyncFuture { + + @Test(timeout = 60000) + public void testGet() throws Exception { + long timeout = 5000; + long txid = 100000; + SyncFuture syncFulture = new SyncFuture(txid, null); + syncFulture.done(txid, null); + assertEquals(txid, syncFulture.get(timeout)); + + syncFulture.reset(txid, null); + try { + syncFulture.get(timeout); + fail("Should have timed out but not"); + } catch (TimeoutIOException e) { + // test passed + } + } + +}