diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 55c5219ca78..464f51baef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
@@ -37,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -52,43 +55,47 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@VisibleForTesting
public class LogRoller extends HasThread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
- private final ReentrantLock rollLock = new ReentrantLock();
- private final AtomicBoolean rollLog = new AtomicBoolean(false);
- private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>();
+ private final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>();
private final Server server;
protected final RegionServerServices services;
- private volatile long lastrolltime = System.currentTimeMillis();
+ private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
- private final long rollperiod;
+ private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private volatile boolean running = true;
- public void addWAL(final WAL wal) {
- if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
- wal.registerWALActionsListener(new WALActionsListener() {
- @Override
- public void logRollRequested(boolean lowReplicas) {
- walNeedsRoll.put(wal, Boolean.TRUE);
- // TODO logs will contend with each other here, replace with e.g. DelayedQueue
- synchronized(rollLog) {
- rollLog.set(true);
- rollLog.notifyAll();
+ public void addWAL(WAL wal) {
+ // check without lock first
+ if (walNeedsRoll.containsKey(wal)) {
+ return;
+ }
+ // this is to avoid race between addWAL and requestRollAll.
+ synchronized (this) {
+ if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
+ wal.registerWALActionsListener(new WALActionsListener() {
+ @Override
+ public void logRollRequested(boolean lowReplicas) {
+ // TODO logs will contend with each other here, replace with e.g. DelayedQueue
+ synchronized (LogRoller.this) {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ LogRoller.this.notifyAll();
+ }
}
- }
- });
+ });
+ }
}
}
public void requestRollAll() {
- for (WAL wal : walNeedsRoll.keySet()) {
- walNeedsRoll.put(wal, Boolean.TRUE);
- }
- synchronized(rollLog) {
- rollLog.set(true);
- rollLog.notifyAll();
+ synchronized (this) {
+ List wals = new ArrayList(walNeedsRoll.keySet());
+ for (WAL wal : wals) {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ }
+ notifyAll();
}
}
@@ -97,7 +104,7 @@ public class LogRoller extends HasThread implements Closeable {
super("LogRoller");
this.server = server;
this.services = services;
- this.rollperiod = this.server.getConfiguration().
+ this.rollPeriod = this.server.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -105,19 +112,10 @@ public class LogRoller extends HasThread implements Closeable {
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}
- @Override
- public void interrupt() {
- // Wake up if we are waiting on rollLog. For tests.
- synchronized (rollLog) {
- this.rollLog.notify();
- }
- super.interrupt();
- }
-
/**
* we need to check low replication in period, see HBASE-18132
*/
- void checkLowReplication(long now) {
+ private void checkLowReplication(long now) {
try {
for (Entry entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
@@ -152,47 +150,49 @@ public class LogRoller extends HasThread implements Closeable {
@Override
public void run() {
while (running) {
+ boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
- boolean periodic = false;
- if (!rollLog.get()) {
- periodic = (now - this.lastrolltime) > this.rollperiod;
- if (!periodic) {
- synchronized (rollLog) {
- try {
- if (!rollLog.get()) {
- rollLog.wait(this.threadWakeFrequency);
- }
- } catch (InterruptedException e) {
- // Fall through
- }
- }
- continue;
- }
- // Time for periodic roll
- LOG.debug("Wal roll period {} ms elapsed", this.rollperiod);
+ periodic = (now - this.lastRollTime) > this.rollPeriod;
+ if (periodic) {
+ // Time for periodic roll, fall through
+ LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
} else {
- LOG.debug("WAL roll requested");
+ synchronized (this) {
+ if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
+ // WAL roll requested, fall through
+ LOG.debug("WAL roll requested");
+ } else {
+ try {
+ wait(this.threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // restore the interrupt state
+ Thread.currentThread().interrupt();
+ }
+ // goto the beginning to check whether again whether we should fall through to roll
+ // several WALs, and also check whether we should quit.
+ continue;
+ }
+ }
}
- rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
- this.lastrolltime = now;
- for (Entry entry : walNeedsRoll.entrySet()) {
- final WAL wal = entry.getKey();
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
- // The returned value is an array of actual region names.
- final byte [][] regionsToFlush = wal.rollWriter(periodic ||
- entry.getValue().booleanValue());
+ this.lastRollTime = System.currentTimeMillis();
+ for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter
+ .hasNext();) {
+ Entry entry = iter.next();
+ WAL wal = entry.getKey();
+ // reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
+ // The returned value is an array of actual region names.
+ byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
- scheduleFlush(r);
+ scheduleFlush(Bytes.toString(r));
}
}
}
- } catch (FailedLogCloseException e) {
- abort("Failed log close in log roller", e);
- } catch (java.net.ConnectException e) {
+ } catch (FailedLogCloseException | ConnectException e) {
abort("Failed log close in log roller", e);
} catch (IOException ex) {
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
@@ -201,12 +201,6 @@ public class LogRoller extends HasThread implements Closeable {
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
abort("Log rolling failed", ex);
- } finally {
- try {
- rollLog.set(false);
- } finally {
- rollLock.unlock();
- }
}
}
LOG.info("LogRoller exiting.");
@@ -215,22 +209,20 @@ public class LogRoller extends HasThread implements Closeable {
/**
* @param encodedRegionName Encoded name of region to flush.
*/
- private void scheduleFlush(final byte [] encodedRegionName) {
- boolean scheduled = false;
- HRegion r = (HRegion) this.services.getRegion(Bytes.toString(encodedRegionName));
- FlushRequester requester = null;
- if (r != null) {
- requester = this.services.getFlushRequester();
- if (requester != null) {
- // force flushing all stores to clean old logs
- requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
- scheduled = true;
- }
+ private void scheduleFlush(String encodedRegionName) {
+ HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
+ if (r == null) {
+ LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
+ return;
}
- if (!scheduled) {
- LOG.warn("Failed to schedule flush of {}, region={}, requester={}",
- Bytes.toString(encodedRegionName), r, requester);
+ FlushRequester requester = this.services.getFlushRequester();
+ if (requester == null) {
+ LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
+ encodedRegionName, r);
+ return;
}
+ // force flushing all stores to clean old logs
+ requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
}
/**
@@ -239,12 +231,7 @@ public class LogRoller extends HasThread implements Closeable {
*/
@VisibleForTesting
public boolean walRollFinished() {
- for (boolean needRoll : walNeedsRoll.values()) {
- if (needRoll) {
- return false;
- }
- }
- return true;
+ return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index c9c17ea1e7f..43f15120548 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -280,6 +280,8 @@ public abstract class AbstractFSWAL implements WAL {
*/
protected final String implClassName;
+ protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
+
public long getFilenum() {
return this.filenum.get();
}
@@ -681,11 +683,8 @@ public abstract class AbstractFSWAL implements WAL {
}
/**
- *
- * Cleans up current writer closing it and then puts in place the passed in
- * nextWriter
.
- *
- *
+ * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
+ *
*
* - In the case of creating a new WAL, oldPath will be null.
* - In the case of rolling over from one file to the next, none of the parameters will be null.
@@ -693,7 +692,6 @@ public abstract class AbstractFSWAL implements WAL {
*
- In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.
*
- *
* @param oldPath may be null
* @param newPath may be null
* @param nextWriter may be null
@@ -875,8 +873,14 @@ public abstract class AbstractFSWAL implements WAL {
return cachedSyncFutures.get().reset(sequence);
}
+ protected boolean isLogRollRequested() {
+ return rollRequested.get();
+ }
+
protected final void requestLogRoll(boolean tooFewReplicas) {
- if (!this.listeners.isEmpty()) {
+ // If we have already requested a roll, don't do it again
+ // And only set rollRequested to true when there is a registered listener
+ if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
for (WALActionsListener i : this.listeners) {
i.logRollRequested(tooFewReplicas);
}
@@ -1031,6 +1035,13 @@ public abstract class AbstractFSWAL implements WAL {
protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
+ /**
+ * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
+ * will begin to work before returning from this method. If we clear the flag after returning from
+ * this call, we may miss a roll request. The implementation class should choose a proper place to
+ * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
+ * start writing to the new writer.
+ */
protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 5699b3d2f99..79409a0245a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -167,9 +167,6 @@ public class AsyncFSWAL extends AbstractFSWAL {
// notice that, modification to this field is only allowed under the protection of consumeLock.
private volatile int epochAndState;
- // used to guard the log roll request when we exceed the log roll size.
- private boolean rollRequested;
-
private boolean readyForRolling;
private final Condition readyForRollingCond = consumeLock.newCondition();
@@ -336,10 +333,9 @@ public class AsyncFSWAL extends AbstractFSWAL {
// closed soon.
return;
}
- if (writer.getLength() < logrollsize || rollRequested) {
+ if (writer.getLength() < logrollsize || isLogRollRequested()) {
return;
}
- rollRequested = true;
requestLogRoll();
}
@@ -666,7 +662,6 @@ public class AsyncFSWAL extends AbstractFSWAL {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = nextWriter.getLength();
- this.rollRequested = false;
this.highestProcessedAppendTxidAtLastSync = 0L;
consumeLock.lock();
try {
@@ -675,6 +670,8 @@ public class AsyncFSWAL extends AbstractFSWAL {
int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
// set a new epoch and also clear waitingRoll and writerBroken
this.epochAndState = nextEpoch << 2;
+ // Reset rollRequested status
+ rollRequested.set(false);
consumeExecutor.execute(consumer);
} finally {
consumeLock.unlock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 4681c290dfd..b77210ed481 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -381,6 +381,8 @@ public class FSHLog extends AbstractFSWAL {
} finally {
// Let the writer thread go regardless, whether error or not.
if (zigzagLatch != null) {
+ // Reset rollRequested status
+ rollRequested.set(false);
zigzagLatch.releaseSafePoint();
// syncFuture will be null if we failed our wait on safe point above. Otherwise, if
// latch was obtained successfully, the sync we threw in either trigger the latch or it
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index cf858b1f2f1..20cefea34c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -436,7 +436,7 @@ public class TestWALLockup {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
if (logRoller != null) {
- logRoller.interrupt();
+ logRoller.close();
}
if (dodgyWAL != null) {
dodgyWAL.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 399cdc4162f..93024288e70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -17,18 +17,49 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SequenceId;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
@@ -39,7 +70,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
/**
* Provides AsyncFSWAL test cases.
*/
-@Category({ RegionServerTests.class, MediumTests.class })
+@Category({ RegionServerTests.class, LargeTests.class })
public class TestAsyncFSWAL extends AbstractTestFSWAL {
@ClassRule
@@ -90,4 +121,101 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
asyncFSWAL.init();
return asyncFSWAL;
}
+
+ @Test
+ public void testBrokenWriter() throws Exception {
+ Server server = mock(Server.class);
+ when(server.getConfiguration()).thenReturn(CONF);
+ RegionServerServices services = mock(RegionServerServices.class);
+ TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+ RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : td.getColumnFamilyNames()) {
+ scopes.put(fam, 0);
+ }
+ long timestamp = System.currentTimeMillis();
+ String testName = currentTest.getMethodName();
+ AtomicInteger failedCount = new AtomicInteger(0);
+ try (LogRoller roller = new LogRoller(server, services);
+ AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
+ testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
+
+ @Override
+ protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ AsyncWriter writer = super.createWriterInstance(path);
+ return new AsyncWriter() {
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ @Override
+ public long getLength() {
+ return writer.getLength();
+ }
+
+ @Override
+ public CompletableFuture sync() {
+ CompletableFuture result = writer.sync();
+ if (failedCount.incrementAndGet() < 1000) {
+ CompletableFuture future = new CompletableFuture<>();
+ FutureUtils.addListener(result,
+ (r, e) -> future.completeExceptionally(new IOException("Inject Error")));
+ return future;
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public void append(Entry entry) {
+ writer.append(entry);
+ }
+ };
+ }
+ }) {
+ wal.init();
+ roller.addWAL(wal);
+ roller.start();
+ int numThreads = 10;
+ AtomicReference error = new AtomicReference<>();
+ Thread[] threads = new Thread[numThreads];
+ for (int i = 0; i < 10; i++) {
+ final int index = i;
+ threads[index] = new Thread("Write-Thread-" + index) {
+
+ @Override
+ public void run() {
+ byte[] row = Bytes.toBytes("row" + index);
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, row, row, timestamp + index, row));
+ WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(),
+ SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, mvcc, scopes);
+ try {
+ wal.append(ri, key, cols, true);
+ } catch (IOException e) {
+ // should not happen
+ throw new UncheckedIOException(e);
+ }
+ try {
+ wal.sync();
+ } catch (IOException e) {
+ error.set(e);
+ }
+ }
+ };
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ assertNull(error.get());
+ }
+ }
}