HBASE-17053 Remove LogRollerExitedChecker

This commit is contained in:
zhangduo 2016-11-09 21:07:07 +08:00
parent 8192a6b6ee
commit e5a288e5c0
9 changed files with 20 additions and 176 deletions

View File

@ -375,8 +375,6 @@ public class HRegionServer extends HasThread implements
// WAL roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
final LogRoller walRoller;
// Lazily initialized if this RegionServer hosts a meta table.
final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@ -1722,34 +1720,6 @@ public class HRegionServer extends HasThread implements
return new WALFactory(conf, listeners, serverName.toString());
}
/**
* We initialize the roller for the wal that handles meta lazily
* since we don't know if this regionserver will handle it. All calls to
* this method return a reference to the that same roller. As newly referenced
* meta regions are brought online, they will be offered to the roller for maintenance.
* As a part of that registration process, the roller will add itself as a
* listener on the wal.
*/
protected LogRoller ensureMetaWALRoller() {
// Using a tmp log roller to ensure metaLogRoller is alive once it is not
// null
LogRoller roller = metawalRoller.get();
if (null == roller) {
LogRoller tmpLogRoller = new LogRoller(this, this);
String n = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
n + "-MetaLogRoller", uncaughtExceptionHandler);
if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
roller = tmpLogRoller;
} else {
// Another thread won starting the roller
Threads.shutdown(tmpLogRoller.getThread());
roller = metawalRoller.get();
}
}
return roller;
}
public MetricsRegionServer getRegionServerMetrics() {
return this.metricsRegionServer;
}
@ -1914,11 +1884,6 @@ public class HRegionServer extends HasThread implements
stop("One or more threads are no longer alive -- stop");
return false;
}
final LogRoller metawalRoller = this.metawalRoller.get();
if (metawalRoller != null && !metawalRoller.isAlive()) {
stop("Meta WAL roller thread is no longer alive -- stop");
return false;
}
return true;
}
@ -1932,11 +1897,9 @@ public class HRegionServer extends HasThread implements
@Override
public WAL getWAL(HRegionInfo regionInfo) throws IOException {
WAL wal;
LogRoller roller = walRoller;
//_ROOT_ and hbase:meta regions have separate WAL.
if (regionInfo != null && regionInfo.isMetaTable() &&
regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
roller = ensureMetaWALRoller();
// _ROOT_ and hbase:meta regions have separate WAL.
if (regionInfo != null && regionInfo.isMetaTable()
&& regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
} else if (regionInfo == null) {
wal = walFactory.getWAL(UNSPECIFIED_REGION, null);
@ -1944,7 +1907,7 @@ public class HRegionServer extends HasThread implements
byte[] namespace = regionInfo.getTable().getNamespace();
wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
}
roller.addWAL(wal);
walRoller.addWAL(wal);
return wal;
}
@ -2330,11 +2293,7 @@ public class HRegionServer extends HasThread implements
this.spanReceiverHost.closeReceivers();
}
if (this.walRoller != null) {
Threads.shutdown(this.walRoller.getThread());
}
final LogRoller metawalRoller = this.metawalRoller.get();
if (metawalRoller != null) {
Threads.shutdown(metawalRoller.getThread());
this.walRoller.close();
}
if (this.compactSplitThread != null) {
this.compactSplitThread.join();

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@ -49,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@VisibleForTesting
public class LogRoller extends HasThread {
public class LogRoller extends HasThread implements Closeable {
private static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
@ -62,6 +63,8 @@ public class LogRoller extends HasThread {
private final long rollperiod;
private final int threadWakeFrequency;
private volatile boolean running = true;
public void addWAL(final WAL wal) {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
wal.registerWALActionsListener(new WALActionsListener.Base() {
@ -110,7 +113,7 @@ public class LogRoller extends HasThread {
@Override
public void run() {
while (!server.isStopped()) {
while (running) {
long now = System.currentTimeMillis();
boolean periodic = false;
if (!rollLog.get()) {
@ -167,9 +170,6 @@ public class LogRoller extends HasThread {
}
}
}
for (WAL wal : walNeedsRoll.keySet()) {
wal.logRollerExited();
}
LOG.info("LogRoller exiting.");
}
@ -208,4 +208,10 @@ public class LogRoller extends HasThread {
}
return true;
}
@Override
public void close() {
running = false;
interrupt();
}
}

View File

@ -25,7 +25,6 @@ import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.io.IOException;
@ -40,7 +39,6 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -143,10 +141,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS =
"hbase.wal.async.logroller.exited.check.interval.ms";
public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000;
private final EventLoop eventLoop;
private final Lock consumeLock = new ReentrantLock();
@ -176,8 +170,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final int createMaxRetries;
private final long logRollerExitedCheckIntervalMs;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
@ -196,85 +188,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// file length when we issue last sync request on the writer
private long fileLengthAtLastSync;
private volatile boolean logRollerExited;
private final class LogRollerExitedChecker implements Runnable {
private boolean cancelled;
private ScheduledFuture<?> future;
public synchronized void setFuture(ScheduledFuture<?> future) {
this.future = future;
}
// See the comments in syncFailed why we need to do this.
private void cleanup() {
unackedAppends.clear();
toWriteAppends.forEach(entry -> {
try {
entry.stampRegionSequenceId();
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
});
toWriteAppends.clear();
IOException error = new IOException("sync failed but log roller exited");
for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
sync.done(sync.getTxid(), error);
syncFutures.remove();
}
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
for (long cursorBound =
waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
if (!waitingConsumePayloads.isPublished(nextCursor)) {
break;
}
RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
switch (truck.type()) {
case APPEND:
try {
truck.unloadAppend().stampRegionSequenceId();
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
break;
case SYNC:
SyncFuture sync = truck.unloadSync();
sync.done(sync.getTxid(), error);
break;
default:
LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
break;
}
waitingConsumePayloadsGatingSequence.set(nextCursor);
}
}
@Override
public void run() {
if (!logRollerExited) {
return;
}
// rollWriter is called in the log roller thread, and logRollerExited will be set just before
// the log rolled exit. So here we can confirm that no one could cancel us if the 'canceled'
// check passed. So it is safe to release the lock after checking 'canceled' flag.
synchronized (this) {
if (cancelled) {
return;
}
}
cleanup();
}
public synchronized void cancel() {
future.cancel(false);
cancelled = true;
}
}
private LogRollerExitedChecker logRollerExitedChecker;
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop)
@ -312,8 +225,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
createMaxRetries =
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
rollWriter();
}
@ -357,14 +268,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (writerBroken) {
return;
}
// schedule a periodical task to check if log roller is exited. Otherwise the the sync
// request maybe blocked forever since we are still waiting for a new writer to write the
// pending data and sync it...
logRollerExitedChecker = new LogRollerExitedChecker();
// we are currently in the EventLoop thread, so it is safe to set the future after
// schedule it since the task can not be executed before we release the thread.
logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
writerBroken = true;
if (waitingRoll) {
readyForRolling = true;
@ -707,11 +610,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
@Override
public void logRollerExited() {
logRollerExited = true;
}
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
boolean overwrite = false;
@ -779,10 +677,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
try {
consumerScheduled.set(true);
writerBroken = waitingRoll = false;
if (logRollerExitedChecker != null) {
logRollerExitedChecker.cancel();
logRollerExitedChecker = null;
}
eventLoop.execute(consumer);
} finally {
consumeLock.unlock();

View File

@ -795,10 +795,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
@Override
public void logRollerExited() {
}
@VisibleForTesting
boolean isLowReplicationRollEnabled() {
return lowReplicationRollEnabled;

View File

@ -228,10 +228,6 @@ class DisabledWALProvider implements WALProvider {
public String toString() {
return "WAL disabled.";
}
@Override
public void logRollerExited() {
}
}
@Override

View File

@ -211,13 +211,6 @@ public interface WAL extends Closeable {
@Override
String toString();
/**
* In some WAL implementation, we will write WAL entries to new file if sync failed, which means,
* the fail recovery is depended on log roller. So here we tell the WAL that log roller has
* already been exited so the WAL cloud give up recovery.
*/
void logRollerExited();
/**
* When outside clients need to consume persisted WALs, they rely on a provided
* Reader.

View File

@ -245,7 +245,7 @@ public class TestFailedAppendAndSync {
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
if (logRoller != null) logRoller.interrupt();
if (logRoller != null) logRoller.close();
if (region != null) {
try {
region.close(true);

View File

@ -285,7 +285,7 @@ public class TestWALLockup {
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
if (logRoller != null) logRoller.interrupt();
if (logRoller != null) logRoller.close();
try {
if (region != null) region.close();
if (dodgyWAL != null) dodgyWAL.close();
@ -469,7 +469,7 @@ public class TestWALLockup {
assertTrue(server.isAborted());
} finally {
if (logRoller != null) {
logRoller.interrupt();
logRoller.close();
}
try {
if (region != null) {

View File

@ -382,7 +382,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
}
if (null != roller) {
LOG.info("shutting down log roller.");
Threads.shutdown(roller.getThread());
roller.close();
}
wals.shutdown();
// Remove the root dir for this test region