HBASE-10142 TestLogRolling#testLogRollOnDatanodeDeath test failure
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1551806 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f97e2d8d19
commit
46e9d08dba
|
@ -39,6 +39,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -123,7 +124,6 @@ class FSHLog implements HLog, Syncable {
|
|||
private final AtomicLong syncedTillHere = new AtomicLong(0);
|
||||
private long lastDeferredTxid;
|
||||
private final Path oldLogDir;
|
||||
private volatile boolean logRollRunning;
|
||||
|
||||
// all writes pending on AsyncWriter/AsyncSyncer thread with
|
||||
// txid <= failedTxid will fail by throwing asyncIOE
|
||||
|
@ -158,7 +158,7 @@ class FSHLog implements HLog, Syncable {
|
|||
* This lock makes sure only one log roll runs at the same time. Should not be taken while
|
||||
* any other lock is held. We don't just use synchronized because that results in bogus and
|
||||
* tedious findbugs warning when it thinks synchronized controls writer thread safety */
|
||||
private final Object rollWriterLock = new Object();
|
||||
private final ReentrantLock rollWriterLock = new ReentrantLock(true);
|
||||
|
||||
/**
|
||||
* Map of encoded region names to their most recent sequence/edit id in their memstore.
|
||||
|
@ -506,7 +506,8 @@ class FSHLog implements HLog, Syncable {
|
|||
@Override
|
||||
public byte [][] rollWriter(boolean force)
|
||||
throws FailedLogCloseException, IOException {
|
||||
synchronized (rollWriterLock) {
|
||||
rollWriterLock.lock();
|
||||
try {
|
||||
// Return if nothing to flush.
|
||||
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
||||
return null;
|
||||
|
@ -517,7 +518,6 @@ class FSHLog implements HLog, Syncable {
|
|||
return null;
|
||||
}
|
||||
try {
|
||||
this.logRollRunning = true;
|
||||
if (!closeBarrier.beginOp()) {
|
||||
LOG.debug("HLog closing. Skipping rolling of writer");
|
||||
return regionsToFlush;
|
||||
|
@ -595,10 +595,11 @@ class FSHLog implements HLog, Syncable {
|
|||
regionsToFlush = findRegionsToForceFlush();
|
||||
}
|
||||
} finally {
|
||||
this.logRollRunning = false;
|
||||
closeBarrier.endOp();
|
||||
}
|
||||
return regionsToFlush;
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1226,10 +1227,15 @@ class FSHLog implements HLog, Syncable {
|
|||
asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
|
||||
|
||||
// 4. check and do logRoll if needed
|
||||
if (!logRollRunning) {
|
||||
checkLowReplication();
|
||||
boolean logRollNeeded = false;
|
||||
if (rollWriterLock.tryLock()) {
|
||||
try {
|
||||
if (writer != null && writer.getLength() > logrollsize) {
|
||||
logRollNeeded = checkLowReplication();
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
try {
|
||||
if (logRollNeeded || writer != null && writer.getLength() > logrollsize) {
|
||||
requestLogRoll();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -1332,7 +1338,11 @@ class FSHLog implements HLog, Syncable {
|
|||
@Override
|
||||
public void postAppend(List<Entry> entries) {}
|
||||
|
||||
private void checkLowReplication() {
|
||||
/*
|
||||
* @return whether log roll should be requested
|
||||
*/
|
||||
private boolean checkLowReplication() {
|
||||
boolean logRollNeeded = false;
|
||||
// if the number of replicas in HDFS has fallen below the configured
|
||||
// value, then roll logs.
|
||||
try {
|
||||
|
@ -1345,7 +1355,7 @@ class FSHLog implements HLog, Syncable {
|
|||
+ numCurrentReplicas + " replicas but expecting no less than "
|
||||
+ this.minTolerableReplication + " replicas. "
|
||||
+ " Requesting close of hlog.");
|
||||
requestLogRoll();
|
||||
logRollNeeded = true;
|
||||
// If rollWriter is requested, increase consecutiveLogRolls. Once it
|
||||
// is larger than lowReplicationRollLimit, disable the
|
||||
// LowReplication-Roller
|
||||
|
@ -1364,7 +1374,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// So we should not enable LowReplication-Roller. If numEntries
|
||||
// is lower than or equals 1, we consider it as a new writer.
|
||||
if (this.numEntries.get() <= 1) {
|
||||
return;
|
||||
return logRollNeeded;
|
||||
}
|
||||
// Once the live datanode number and the replicas return to normal,
|
||||
// enable the LowReplication-Roller.
|
||||
|
@ -1376,6 +1386,7 @@ class FSHLog implements HLog, Syncable {
|
|||
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
|
||||
" still proceeding ahead...");
|
||||
}
|
||||
return logRollNeeded;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -396,7 +396,7 @@ public class TestLogRolling {
|
|||
// the configured value 2.
|
||||
assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
|
||||
|
||||
batchWriteAndWait(table, 3, false, 10000);
|
||||
batchWriteAndWait(table, 3, false, 14000);
|
||||
assertTrue("LowReplication Roller should've been disabled, current replication="
|
||||
+ ((FSHLog) log).getLogReplication(),
|
||||
!log.isLowReplicationRollEnabled());
|
||||
|
|
Loading…
Reference in New Issue