HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

This commit is contained in:
Enis Soztutar 2016-10-18 18:46:02 -07:00
parent 6c89c6251f
commit ef8c65e542
2 changed files with 65 additions and 25 deletions

View File

@ -30,15 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -68,6 +59,15 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
/** /**
* The default implementation of FSWAL. * The default implementation of FSWAL.
*/ */
@ -499,6 +499,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private volatile long sequence; private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync. // Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures; private final BlockingQueue<SyncFuture> syncFutures;
private volatile SyncFuture takeSyncFuture = null;
/** /**
* UPDATE! * UPDATE!
@ -546,6 +547,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
if (!syncFuture.done(currentSequence, t)) { if (!syncFuture.done(currentSequence, t)) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
// This function releases one sync future only. // This function releases one sync future only.
return 1; return 1;
} }
@ -589,13 +591,21 @@ public class FSHLog extends AbstractFSWAL<Writer> {
return sequence; return sequence;
} }
boolean areSyncFuturesReleased() {
// check whether there is no sync futures offered, and no in-flight sync futures that is being
// processed.
return syncFutures.size() <= 0
&& takeSyncFuture == null;
}
public void run() { public void run() {
long currentSequence; long currentSequence;
while (!isInterrupted()) { while (!isInterrupted()) {
int syncCount = 0; int syncCount = 0;
SyncFuture takeSyncFuture;
try { try {
while (true) { while (true) {
takeSyncFuture = null;
// We have to process what we 'take' from the queue // We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take(); takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence; currentSequence = this.sequence;
@ -975,11 +985,23 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* @return True if outstanding sync futures still * @return True if outstanding sync futures still
*/ */
private boolean isOutstandingSyncs() { private boolean isOutstandingSyncs() {
// Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) { for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) { if (!this.syncFutures[i].isDone()) {
return true; return true;
} }
} }
return false;
}
private boolean isOutstandingSyncsFromRunners() {
// Look at SyncFutures in the SyncRunners
for (SyncRunner syncRunner: syncRunners) {
if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
return true;
}
}
return false; return false;
} }
@ -1095,11 +1117,13 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless // shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete). // this WAL is broken and we can't get a sync/append to complete).
while (!this.shutdown && this.zigzagLatch.isCocked() while ((!this.shutdown && this.zigzagLatch.isCocked()
&& highestSyncedTxid.get() < currentSequence && && highestSyncedTxid.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise // We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running. // we'll just be stuck here for ever. In other words, ensure there syncs running.
isOutstandingSyncs()) { isOutstandingSyncs())
// Wait for all SyncRunners to finish their work so that we can replace the writer
|| isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) { synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1); this.safePointWaiter.wait(0, 1);
} }

View File

@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import java.io.IOException; import java.io.IOException;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
@ -56,7 +56,18 @@ public class TestLogRollingNoCluster {
withLookingForStuckThread(true).build(); withLookingForStuckThread(true).build();
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
private static final int THREAD_COUNT = 100; // Spin up this many threads private static final int NUM_THREADS = 100; // Spin up this many threads
private static final int NUM_ENTRIES = 100; // How many entries to write
/** ProtobufLogWriter that simulates higher latencies in sync() call */
public static class HighLatencySyncWriter extends ProtobufLogWriter {
@Override
public void sync() throws IOException {
Threads.sleep(ThreadLocalRandom.current().nextInt(10));
super.sync();
Threads.sleep(ThreadLocalRandom.current().nextInt(10));
}
}
/** /**
* Spin up a bunch of threads and have them all append to a WAL. Roll the * Spin up a bunch of threads and have them all append to a WAL. Roll the
@ -65,38 +76,42 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test @Test
public void testContendedLogRolling() throws IOException, InterruptedException { public void testContendedLogRolling() throws Exception {
Path dir = TEST_UTIL.getDataTestDir(); TEST_UTIL.startMiniDFSCluster(3);
Path dir = TEST_UTIL.getDataTestDirOnTestFS();
// The implementation needs to know the 'handler' count. // The implementation needs to know the 'handler' count.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(WALFactory.WAL_PROVIDER, "filesystem"); conf.set(WALFactory.WAL_PROVIDER, "filesystem");
FSUtils.setRootDir(conf, dir); FSUtils.setRootDir(conf, dir);
conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{}, null); final WAL wal = wals.getWAL(new byte[]{}, null);
Appender [] appenders = null; Appender [] appenders = null;
final int count = THREAD_COUNT; final int numThreads = NUM_THREADS;
appenders = new Appender[count]; appenders = new Appender[numThreads];
try { try {
for (int i = 0; i < count; i++) { for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries // Have each appending thread write 'count' entries
appenders[i] = new Appender(wal, i, count); appenders[i] = new Appender(wal, i, NUM_ENTRIES);
} }
for (int i = 0; i < count; i++) { for (int i = 0; i < numThreads; i++) {
appenders[i].start(); appenders[i].start();
} }
for (int i = 0; i < count; i++) { for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal //ensure that all threads are joined before closing the wal
appenders[i].join(); appenders[i].join();
} }
} finally { } finally {
wals.close(); wals.close();
} }
for (int i = 0; i < count; i++) { for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException()); assertFalse(appenders[i].isException());
} }
TEST_UTIL.shutdownMiniDFSCluster();
} }
/** /**
@ -149,6 +164,7 @@ public class TestLogRollingNoCluster {
} }
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid); wal.sync(txid);
} }
String msg = getName() + " finished"; String msg = getName() + " finished";