HBASE-14401 Stamp failed appends with sequenceid too.... Cleans up latches
This commit is contained in:
parent
bf46fc5542
commit
72b4c906b8
|
@ -279,8 +279,6 @@ public class FSHLog implements WAL {
|
|||
|
||||
private final int slowSyncNs;
|
||||
|
||||
private final static Object [] NO_ARGS = new Object []{};
|
||||
|
||||
// If live datanode count is lower than the default replicas value,
|
||||
// RollWriter will be triggered in each sync(So the RollWriter will be
|
||||
// triggered one by one in a short time). Using it as a workaround to slow
|
||||
|
@ -821,8 +819,7 @@ public class FSHLog implements WAL {
|
|||
} catch (FailedSyncBeforeLogCloseException e) {
|
||||
// If unflushed/unsynced entries on close, it is reason to abort.
|
||||
if (isUnflushedEntries()) throw e;
|
||||
// Else, let is pass through to the close.
|
||||
LOG.warn("Failed sync but no outstanding unsync'd edits so falling through to close; " +
|
||||
LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
|
@ -1332,8 +1329,8 @@ public class FSHLog implements WAL {
|
|||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
|
||||
" still proceeding ahead...");
|
||||
LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
|
||||
", continuing...");
|
||||
}
|
||||
return logRollNeeded;
|
||||
}
|
||||
|
@ -1725,7 +1722,9 @@ public class FSHLog implements WAL {
|
|||
// add appends to dfsclient as they come in. Batching appends doesn't give any significant
|
||||
// benefit on measurement. Handler sync calls we will batch up. If we get an exception
|
||||
// appending an edit, we fail all subsequent appends and syncs with the same exception until
|
||||
// the WAL is reset.
|
||||
// the WAL is reset. It is important that we not short-circuit and exit early this method.
|
||||
// It is important that we always go through the attainSafePoint on the end. Another thread,
|
||||
// the log roller may be waiting on a signal from us here and will just hang without it.
|
||||
|
||||
try {
|
||||
if (truck.hasSyncFuturePayload()) {
|
||||
|
@ -1736,15 +1735,20 @@ public class FSHLog implements WAL {
|
|||
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
|
||||
try {
|
||||
FSWALEntry entry = truck.unloadFSWALEntryPayload();
|
||||
// If already an exception, do not try to append. Throw.
|
||||
if (this.exception != null) throw this.exception;
|
||||
if (this.exception != null) {
|
||||
// We got an exception on an earlier attempt at append. Do not let this append
|
||||
// go through. Fail it but stamp the sequenceid into this append though failed.
|
||||
// We need to do this to close the latch held down deep in WALKey...that is waiting
|
||||
// on sequenceid assignment otherwise it will just hang out (The #append method
|
||||
// called below does this also internally).
|
||||
entry.stampRegionSequenceId();
|
||||
// Return to keep processing events coming off the ringbuffer
|
||||
return;
|
||||
}
|
||||
append(entry);
|
||||
} catch (Exception e) {
|
||||
// Failed append. Record the exception. Throw it from here on out til new WAL in place
|
||||
this.exception = new DamagedWALException(e);
|
||||
// If append fails, presume any pending syncs will fail too; let all waiting handlers
|
||||
// know of the exception.
|
||||
cleanupOutstandingSyncsOnException(sequence, this.exception);
|
||||
// Failed append. Record the exception.
|
||||
this.exception = e;
|
||||
// Return to keep processing events coming off the ringbuffer
|
||||
return;
|
||||
} finally {
|
||||
|
@ -1752,7 +1756,7 @@ public class FSHLog implements WAL {
|
|||
scope.close(); // append scope is complete
|
||||
}
|
||||
} else {
|
||||
// They can't both be null. Fail all up to this!!!
|
||||
// What is this if not an append or sync. Fail all up to this!!!
|
||||
cleanupOutstandingSyncsOnException(sequence,
|
||||
new IllegalStateException("Neither append nor sync"));
|
||||
// Return to keep processing.
|
||||
|
@ -1771,23 +1775,22 @@ public class FSHLog implements WAL {
|
|||
LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
|
||||
}
|
||||
|
||||
// Below expects that the offer 'transfers' responsibility for the outstanding syncs to the
|
||||
// syncRunner. We should never get an exception in here.
|
||||
int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
|
||||
try {
|
||||
if (this.exception != null) {
|
||||
// Do not try to sync. If a this.exception, then we failed an append. Do not try to
|
||||
// sync a failed append. Fall through to the attainSafePoint below. It is part of the
|
||||
// means by which we put in place a new WAL. A new WAL is how we clean up.
|
||||
// Don't throw because then we'll not get to attainSafePoint.
|
||||
cleanupOutstandingSyncsOnException(sequence, this.exception);
|
||||
} else {
|
||||
if (this.exception == null) {
|
||||
// Below expects that the offer 'transfers' responsibility for the outstanding syncs to
|
||||
// the syncRunner. We should never get an exception in here.
|
||||
int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
|
||||
try {
|
||||
this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
|
||||
} catch (Exception e) {
|
||||
// Should NEVER get here.
|
||||
requestLogRoll();
|
||||
this.exception = new DamagedWALException("Failed offering sync", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Should NEVER get here.
|
||||
cleanupOutstandingSyncsOnException(sequence, e);
|
||||
throw e;
|
||||
}
|
||||
// We may have picked up an exception above trying to offer sync
|
||||
if (this.exception != null) {
|
||||
cleanupOutstandingSyncsOnException(sequence,
|
||||
new DamagedWALException("On sync", this.exception));
|
||||
}
|
||||
attainSafePoint(sequence);
|
||||
this.syncFuturesCount = 0;
|
||||
|
@ -1883,9 +1886,11 @@ public class FSHLog implements WAL {
|
|||
// Update metrics.
|
||||
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Could not append. Requesting close of WAL", e);
|
||||
String msg = "Append sequenceId=" + regionSequenceId +
|
||||
", requesting roll of WAL";
|
||||
LOG.warn(msg, e);
|
||||
requestLogRoll();
|
||||
throw e;
|
||||
throw new DamagedWALException(msg, e);
|
||||
}
|
||||
numEntries.incrementAndGet();
|
||||
}
|
||||
|
|
|
@ -315,8 +315,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
|||
*/
|
||||
public long getSequenceId(final long maxWaitForSeqId) throws IOException {
|
||||
// TODO: This implementation waiting on a latch is problematic because if a higher level
|
||||
// determines we should stop or abort, there is not global list of all these blocked WALKeys
|
||||
// waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId
|
||||
// determines we should stop or abort, there is no global list of all these blocked WALKeys
|
||||
// waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
|
||||
//
|
||||
// UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
|
||||
// even those that have failed (previously we were not... so they would just hang out...).
|
||||
// St.Ack 20150910
|
||||
try {
|
||||
if (maxWaitForSeqId < 0) {
|
||||
this.seqNumAssignedLatch.await();
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -33,11 +34,13 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
|
@ -78,6 +81,8 @@ public class TestFailedAppendAndSync {
|
|||
public void setup() throws IOException {
|
||||
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
// Disable block cache.
|
||||
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
||||
tableName = TableName.valueOf(name.getMethodName());
|
||||
}
|
||||
|
@ -101,6 +106,7 @@ public class TestFailedAppendAndSync {
|
|||
*/
|
||||
@Test (timeout=300000)
|
||||
public void testLockupAroundBadAssignSync() throws IOException {
|
||||
final AtomicLong rolls = new AtomicLong(0);
|
||||
// Dodgy WAL. Will throw exceptions when flags set.
|
||||
class DodgyFSLog extends FSHLog {
|
||||
volatile boolean throwSyncException = false;
|
||||
|
@ -111,6 +117,13 @@ public class TestFailedAppendAndSync {
|
|||
super(fs, root, logDir, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
||||
byte [][] regions = super.rollWriter(force);
|
||||
rolls.getAndIncrement();
|
||||
return regions;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writer createWriterInstance(Path path) throws IOException {
|
||||
final Writer w = super.createWriterInstance(path);
|
||||
|
@ -175,7 +188,7 @@ public class TestFailedAppendAndSync {
|
|||
} catch (IOException ioe) {
|
||||
fail();
|
||||
}
|
||||
|
||||
long rollsCount = rolls.get();
|
||||
try {
|
||||
dodgyWAL.throwAppendException = true;
|
||||
dodgyWAL.throwSyncException = false;
|
||||
|
@ -185,6 +198,9 @@ public class TestFailedAppendAndSync {
|
|||
} catch (IOException ioe) {
|
||||
threwOnAppend = true;
|
||||
}
|
||||
while (rollsCount == rolls.get()) Threads.sleep(100);
|
||||
rollsCount = rolls.get();
|
||||
|
||||
// When we get to here.. we should be ok. A new WAL has been put in place. There were no
|
||||
// appends to sync. We should be able to continue.
|
||||
|
||||
|
@ -197,6 +213,8 @@ public class TestFailedAppendAndSync {
|
|||
} catch (IOException ioe) {
|
||||
threwOnBoth = true;
|
||||
}
|
||||
while (rollsCount == rolls.get()) Threads.sleep(100);
|
||||
|
||||
// Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able
|
||||
// to just continue.
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -81,6 +82,8 @@ public class TestWALLockup {
|
|||
public void setup() throws IOException {
|
||||
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
// Disable block cache.
|
||||
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
||||
tableName = TableName.valueOf(name.getMethodName());
|
||||
}
|
||||
|
@ -139,10 +142,10 @@ public class TestWALLockup {
|
|||
protected void beforeWaitOnSafePoint() {
|
||||
if (throwException) {
|
||||
LOG.info("COUNTDOWN");
|
||||
// Don't countdown latch until someone waiting on it.
|
||||
while (this.latch.getCount() <= 0) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
// Don't countdown latch until someone waiting on it otherwise, the above
|
||||
// afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll
|
||||
// be stuck; test won't go down
|
||||
while (this.latch.getCount() <= 0) Threads.sleep(1);
|
||||
this.latch.countDown();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue