HBASE-22684 The log rolling request maybe canceled immediately in LogRoller due to a race (#378)
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
88ab7acbca
commit
1092533308
|
@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
|
@ -37,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -52,43 +55,47 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public class LogRoller extends HasThread implements Closeable {
|
public class LogRoller extends HasThread implements Closeable {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
|
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
|
||||||
private final ReentrantLock rollLock = new ReentrantLock();
|
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
|
||||||
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
|
||||||
private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
|
|
||||||
private final Server server;
|
private final Server server;
|
||||||
protected final RegionServerServices services;
|
protected final RegionServerServices services;
|
||||||
private volatile long lastrolltime = System.currentTimeMillis();
|
private volatile long lastRollTime = System.currentTimeMillis();
|
||||||
// Period to roll log.
|
// Period to roll log.
|
||||||
private final long rollperiod;
|
private final long rollPeriod;
|
||||||
private final int threadWakeFrequency;
|
private final int threadWakeFrequency;
|
||||||
// The interval to check low replication on hlog's pipeline
|
// The interval to check low replication on hlog's pipeline
|
||||||
private long checkLowReplicationInterval;
|
private long checkLowReplicationInterval;
|
||||||
|
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
|
|
||||||
public void addWAL(final WAL wal) {
|
public void addWAL(WAL wal) {
|
||||||
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
|
// check without lock first
|
||||||
wal.registerWALActionsListener(new WALActionsListener() {
|
if (walNeedsRoll.containsKey(wal)) {
|
||||||
@Override
|
return;
|
||||||
public void logRollRequested(boolean lowReplicas) {
|
}
|
||||||
walNeedsRoll.put(wal, Boolean.TRUE);
|
// this is to avoid race between addWAL and requestRollAll.
|
||||||
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
|
synchronized (this) {
|
||||||
synchronized(rollLog) {
|
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
|
||||||
rollLog.set(true);
|
wal.registerWALActionsListener(new WALActionsListener() {
|
||||||
rollLog.notifyAll();
|
@Override
|
||||||
|
public void logRollRequested(boolean lowReplicas) {
|
||||||
|
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
|
||||||
|
synchronized (LogRoller.this) {
|
||||||
|
walNeedsRoll.put(wal, Boolean.TRUE);
|
||||||
|
LogRoller.this.notifyAll();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void requestRollAll() {
|
public void requestRollAll() {
|
||||||
for (WAL wal : walNeedsRoll.keySet()) {
|
synchronized (this) {
|
||||||
walNeedsRoll.put(wal, Boolean.TRUE);
|
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
|
||||||
}
|
for (WAL wal : wals) {
|
||||||
synchronized(rollLog) {
|
walNeedsRoll.put(wal, Boolean.TRUE);
|
||||||
rollLog.set(true);
|
}
|
||||||
rollLog.notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +104,7 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
super("LogRoller");
|
super("LogRoller");
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.rollperiod = this.server.getConfiguration().
|
this.rollPeriod = this.server.getConfiguration().
|
||||||
getLong("hbase.regionserver.logroll.period", 3600000);
|
getLong("hbase.regionserver.logroll.period", 3600000);
|
||||||
this.threadWakeFrequency = this.server.getConfiguration().
|
this.threadWakeFrequency = this.server.getConfiguration().
|
||||||
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||||
|
@ -105,19 +112,10 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void interrupt() {
|
|
||||||
// Wake up if we are waiting on rollLog. For tests.
|
|
||||||
synchronized (rollLog) {
|
|
||||||
this.rollLog.notify();
|
|
||||||
}
|
|
||||||
super.interrupt();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* we need to check low replication in period, see HBASE-18132
|
* we need to check low replication in period, see HBASE-18132
|
||||||
*/
|
*/
|
||||||
void checkLowReplication(long now) {
|
private void checkLowReplication(long now) {
|
||||||
try {
|
try {
|
||||||
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
|
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
|
||||||
WAL wal = entry.getKey();
|
WAL wal = entry.getKey();
|
||||||
|
@ -152,47 +150,49 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (running) {
|
while (running) {
|
||||||
|
boolean periodic = false;
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
checkLowReplication(now);
|
checkLowReplication(now);
|
||||||
boolean periodic = false;
|
periodic = (now - this.lastRollTime) > this.rollPeriod;
|
||||||
if (!rollLog.get()) {
|
if (periodic) {
|
||||||
periodic = (now - this.lastrolltime) > this.rollperiod;
|
// Time for periodic roll, fall through
|
||||||
if (!periodic) {
|
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
|
||||||
synchronized (rollLog) {
|
|
||||||
try {
|
|
||||||
if (!rollLog.get()) {
|
|
||||||
rollLog.wait(this.threadWakeFrequency);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Fall through
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Time for periodic roll
|
|
||||||
LOG.debug("Wal roll period {} ms elapsed", this.rollperiod);
|
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("WAL roll requested");
|
synchronized (this) {
|
||||||
|
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
|
||||||
|
// WAL roll requested, fall through
|
||||||
|
LOG.debug("WAL roll requested");
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
wait(this.threadWakeFrequency);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// restore the interrupt state
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
// goto the beginning to check whether again whether we should fall through to roll
|
||||||
|
// several WALs, and also check whether we should quit.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
|
|
||||||
try {
|
try {
|
||||||
this.lastrolltime = now;
|
this.lastRollTime = System.currentTimeMillis();
|
||||||
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
|
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
|
||||||
final WAL wal = entry.getKey();
|
.hasNext();) {
|
||||||
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
Entry<WAL, Boolean> entry = iter.next();
|
||||||
// The returned value is an array of actual region names.
|
WAL wal = entry.getKey();
|
||||||
final byte [][] regionsToFlush = wal.rollWriter(periodic ||
|
// reset the flag in front to avoid missing roll request before we return from rollWriter.
|
||||||
entry.getValue().booleanValue());
|
|
||||||
walNeedsRoll.put(wal, Boolean.FALSE);
|
walNeedsRoll.put(wal, Boolean.FALSE);
|
||||||
|
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
||||||
|
// The returned value is an array of actual region names.
|
||||||
|
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
|
||||||
if (regionsToFlush != null) {
|
if (regionsToFlush != null) {
|
||||||
for (byte[] r : regionsToFlush) {
|
for (byte[] r : regionsToFlush) {
|
||||||
scheduleFlush(r);
|
scheduleFlush(Bytes.toString(r));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (FailedLogCloseException e) {
|
} catch (FailedLogCloseException | ConnectException e) {
|
||||||
abort("Failed log close in log roller", e);
|
|
||||||
} catch (java.net.ConnectException e) {
|
|
||||||
abort("Failed log close in log roller", e);
|
abort("Failed log close in log roller", e);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
|
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
|
||||||
|
@ -201,12 +201,6 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.error("Log rolling failed", ex);
|
LOG.error("Log rolling failed", ex);
|
||||||
abort("Log rolling failed", ex);
|
abort("Log rolling failed", ex);
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
rollLog.set(false);
|
|
||||||
} finally {
|
|
||||||
rollLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("LogRoller exiting.");
|
LOG.info("LogRoller exiting.");
|
||||||
|
@ -215,22 +209,20 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
/**
|
/**
|
||||||
* @param encodedRegionName Encoded name of region to flush.
|
* @param encodedRegionName Encoded name of region to flush.
|
||||||
*/
|
*/
|
||||||
private void scheduleFlush(final byte [] encodedRegionName) {
|
private void scheduleFlush(String encodedRegionName) {
|
||||||
boolean scheduled = false;
|
HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
|
||||||
HRegion r = (HRegion) this.services.getRegion(Bytes.toString(encodedRegionName));
|
if (r == null) {
|
||||||
FlushRequester requester = null;
|
LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
|
||||||
if (r != null) {
|
return;
|
||||||
requester = this.services.getFlushRequester();
|
|
||||||
if (requester != null) {
|
|
||||||
// force flushing all stores to clean old logs
|
|
||||||
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
|
|
||||||
scheduled = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!scheduled) {
|
FlushRequester requester = this.services.getFlushRequester();
|
||||||
LOG.warn("Failed to schedule flush of {}, region={}, requester={}",
|
if (requester == null) {
|
||||||
Bytes.toString(encodedRegionName), r, requester);
|
LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
|
||||||
|
encodedRegionName, r);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
// force flushing all stores to clean old logs
|
||||||
|
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -239,12 +231,7 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public boolean walRollFinished() {
|
public boolean walRollFinished() {
|
||||||
for (boolean needRoll : walNeedsRoll.values()) {
|
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
|
||||||
if (needRoll) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -280,6 +280,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
*/
|
*/
|
||||||
protected final String implClassName;
|
protected final String implClassName;
|
||||||
|
|
||||||
|
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
public long getFilenum() {
|
public long getFilenum() {
|
||||||
return this.filenum.get();
|
return this.filenum.get();
|
||||||
}
|
}
|
||||||
|
@ -681,11 +683,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
|
||||||
* Cleans up current writer closing it and then puts in place the passed in
|
* <p/>
|
||||||
* <code>nextWriter</code>.
|
|
||||||
* </p>
|
|
||||||
* <p>
|
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>In the case of creating a new WAL, oldPath will be null.</li>
|
* <li>In the case of creating a new WAL, oldPath will be null.</li>
|
||||||
* <li>In the case of rolling over from one file to the next, none of the parameters will be null.
|
* <li>In the case of rolling over from one file to the next, none of the parameters will be null.
|
||||||
|
@ -693,7 +692,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
|
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
|
||||||
* null.</li>
|
* null.</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* </p>
|
|
||||||
* @param oldPath may be null
|
* @param oldPath may be null
|
||||||
* @param newPath may be null
|
* @param newPath may be null
|
||||||
* @param nextWriter may be null
|
* @param nextWriter may be null
|
||||||
|
@ -875,8 +873,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
return cachedSyncFutures.get().reset(sequence);
|
return cachedSyncFutures.get().reset(sequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean isLogRollRequested() {
|
||||||
|
return rollRequested.get();
|
||||||
|
}
|
||||||
|
|
||||||
protected final void requestLogRoll(boolean tooFewReplicas) {
|
protected final void requestLogRoll(boolean tooFewReplicas) {
|
||||||
if (!this.listeners.isEmpty()) {
|
// If we have already requested a roll, don't do it again
|
||||||
|
// And only set rollRequested to true when there is a registered listener
|
||||||
|
if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
|
||||||
for (WALActionsListener i : this.listeners) {
|
for (WALActionsListener i : this.listeners) {
|
||||||
i.logRollRequested(tooFewReplicas);
|
i.logRollRequested(tooFewReplicas);
|
||||||
}
|
}
|
||||||
|
@ -1031,6 +1035,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
protected abstract W createWriterInstance(Path path)
|
protected abstract W createWriterInstance(Path path)
|
||||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
|
||||||
|
* will begin to work before returning from this method. If we clear the flag after returning from
|
||||||
|
* this call, we may miss a roll request. The implementation class should choose a proper place to
|
||||||
|
* clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
|
||||||
|
* start writing to the new writer.
|
||||||
|
*/
|
||||||
protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
|
protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -167,9 +167,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
// notice that, modification to this field is only allowed under the protection of consumeLock.
|
// notice that, modification to this field is only allowed under the protection of consumeLock.
|
||||||
private volatile int epochAndState;
|
private volatile int epochAndState;
|
||||||
|
|
||||||
// used to guard the log roll request when we exceed the log roll size.
|
|
||||||
private boolean rollRequested;
|
|
||||||
|
|
||||||
private boolean readyForRolling;
|
private boolean readyForRolling;
|
||||||
|
|
||||||
private final Condition readyForRollingCond = consumeLock.newCondition();
|
private final Condition readyForRollingCond = consumeLock.newCondition();
|
||||||
|
@ -336,10 +333,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
// closed soon.
|
// closed soon.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (writer.getLength() < logrollsize || rollRequested) {
|
if (writer.getLength() < logrollsize || isLogRollRequested()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
rollRequested = true;
|
|
||||||
requestLogRoll();
|
requestLogRoll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -666,7 +662,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
||||||
}
|
}
|
||||||
this.fileLengthAtLastSync = nextWriter.getLength();
|
this.fileLengthAtLastSync = nextWriter.getLength();
|
||||||
this.rollRequested = false;
|
|
||||||
this.highestProcessedAppendTxidAtLastSync = 0L;
|
this.highestProcessedAppendTxidAtLastSync = 0L;
|
||||||
consumeLock.lock();
|
consumeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -675,6 +670,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
|
int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
|
||||||
// set a new epoch and also clear waitingRoll and writerBroken
|
// set a new epoch and also clear waitingRoll and writerBroken
|
||||||
this.epochAndState = nextEpoch << 2;
|
this.epochAndState = nextEpoch << 2;
|
||||||
|
// Reset rollRequested status
|
||||||
|
rollRequested.set(false);
|
||||||
consumeExecutor.execute(consumer);
|
consumeExecutor.execute(consumer);
|
||||||
} finally {
|
} finally {
|
||||||
consumeLock.unlock();
|
consumeLock.unlock();
|
||||||
|
|
|
@ -381,6 +381,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
} finally {
|
} finally {
|
||||||
// Let the writer thread go regardless, whether error or not.
|
// Let the writer thread go regardless, whether error or not.
|
||||||
if (zigzagLatch != null) {
|
if (zigzagLatch != null) {
|
||||||
|
// Reset rollRequested status
|
||||||
|
rollRequested.set(false);
|
||||||
zigzagLatch.releaseSafePoint();
|
zigzagLatch.releaseSafePoint();
|
||||||
// syncFuture will be null if we failed our wait on safe point above. Otherwise, if
|
// syncFuture will be null if we failed our wait on safe point above. Otherwise, if
|
||||||
// latch was obtained successfully, the sync we threw in either trigger the latch or it
|
// latch was obtained successfully, the sync we threw in either trigger the latch or it
|
||||||
|
|
|
@ -436,7 +436,7 @@ public class TestWALLockup {
|
||||||
// To stop logRoller, its server has to say it is stopped.
|
// To stop logRoller, its server has to say it is stopped.
|
||||||
Mockito.when(server.isStopped()).thenReturn(true);
|
Mockito.when(server.isStopped()).thenReturn(true);
|
||||||
if (logRoller != null) {
|
if (logRoller != null) {
|
||||||
logRoller.interrupt();
|
logRoller.close();
|
||||||
}
|
}
|
||||||
if (dodgyWAL != null) {
|
if (dodgyWAL != null) {
|
||||||
dodgyWAL.close();
|
dodgyWAL.close();
|
||||||
|
|
|
@ -17,18 +17,49 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.LogRoller;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
|
@ -39,7 +70,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
/**
|
/**
|
||||||
* Provides AsyncFSWAL test cases.
|
* Provides AsyncFSWAL test cases.
|
||||||
*/
|
*/
|
||||||
@Category({ RegionServerTests.class, MediumTests.class })
|
@Category({ RegionServerTests.class, LargeTests.class })
|
||||||
public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
@ -90,4 +121,101 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
asyncFSWAL.init();
|
asyncFSWAL.init();
|
||||||
return asyncFSWAL;
|
return asyncFSWAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBrokenWriter() throws Exception {
|
||||||
|
Server server = mock(Server.class);
|
||||||
|
when(server.getConfiguration()).thenReturn(CONF);
|
||||||
|
RegionServerServices services = mock(RegionServerServices.class);
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||||
|
RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
|
||||||
|
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||||
|
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (byte[] fam : td.getColumnFamilyNames()) {
|
||||||
|
scopes.put(fam, 0);
|
||||||
|
}
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
|
String testName = currentTest.getMethodName();
|
||||||
|
AtomicInteger failedCount = new AtomicInteger(0);
|
||||||
|
try (LogRoller roller = new LogRoller(server, services);
|
||||||
|
AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||||
|
testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
||||||
|
AsyncWriter writer = super.createWriterInstance(path);
|
||||||
|
return new AsyncWriter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLength() {
|
||||||
|
return writer.getLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Long> sync() {
|
||||||
|
CompletableFuture<Long> result = writer.sync();
|
||||||
|
if (failedCount.incrementAndGet() < 1000) {
|
||||||
|
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||||
|
FutureUtils.addListener(result,
|
||||||
|
(r, e) -> future.completeExceptionally(new IOException("Inject Error")));
|
||||||
|
return future;
|
||||||
|
} else {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(Entry entry) {
|
||||||
|
writer.append(entry);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}) {
|
||||||
|
wal.init();
|
||||||
|
roller.addWAL(wal);
|
||||||
|
roller.start();
|
||||||
|
int numThreads = 10;
|
||||||
|
AtomicReference<Exception> error = new AtomicReference<>();
|
||||||
|
Thread[] threads = new Thread[numThreads];
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
final int index = i;
|
||||||
|
threads[index] = new Thread("Write-Thread-" + index) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
byte[] row = Bytes.toBytes("row" + index);
|
||||||
|
WALEdit cols = new WALEdit();
|
||||||
|
cols.add(new KeyValue(row, row, row, timestamp + index, row));
|
||||||
|
WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(),
|
||||||
|
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
|
||||||
|
HConstants.NO_NONCE, mvcc, scopes);
|
||||||
|
try {
|
||||||
|
wal.append(ri, key, cols, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// should not happen
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
wal.sync();
|
||||||
|
} catch (IOException e) {
|
||||||
|
error.set(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
assertNull(error.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue