HBASE-26552 Introduce retry to logroller to avoid abort (#4171)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
45801a7d74
commit
77dbf7a059
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -58,6 +59,19 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
|
|
||||||
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
|
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the timeout of log rolling retry.
|
||||||
|
*/
|
||||||
|
protected static final String WAL_ROLL_WAIT_TIMEOUT =
|
||||||
|
"hbase.regionserver.logroll.wait.timeout.ms";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the max count of log rolling retry.
|
||||||
|
* The real retry count is also limited by the timeout of log rolling
|
||||||
|
* via {@link #WAL_ROLL_WAIT_TIMEOUT}
|
||||||
|
*/
|
||||||
|
protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries";
|
||||||
|
|
||||||
protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
|
protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
|
||||||
protected final T abortable;
|
protected final T abortable;
|
||||||
// Period to roll log.
|
// Period to roll log.
|
||||||
|
@ -65,6 +79,10 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
private final int threadWakeFrequency;
|
private final int threadWakeFrequency;
|
||||||
// The interval to check low replication on hlog's pipeline
|
// The interval to check low replication on hlog's pipeline
|
||||||
private final long checkLowReplicationInterval;
|
private final long checkLowReplicationInterval;
|
||||||
|
// Wait period for roll log
|
||||||
|
private final long rollWaitTimeout;
|
||||||
|
// Max retry for roll log
|
||||||
|
private final int maxRollRetry;
|
||||||
|
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
|
|
||||||
|
@ -112,6 +130,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||||
this.checkLowReplicationInterval =
|
this.checkLowReplicationInterval =
|
||||||
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
||||||
|
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
|
||||||
|
// retry rolling does not have to be the default behavior, so the default value is 0 here
|
||||||
|
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -182,9 +203,29 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
Map<byte[], List<byte[]>> regionsToFlush = null;
|
||||||
|
int nAttempts = 0;
|
||||||
|
long startWaiting = System.currentTimeMillis();
|
||||||
|
do {
|
||||||
|
try {
|
||||||
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
||||||
// The returned value is an collection of actual region and family names.
|
// The returned value is an collection of actual region and family names.
|
||||||
Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
|
regionsToFlush = controller.rollWal(System.currentTimeMillis());
|
||||||
|
break;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
long waitingTime = System.currentTimeMillis() - startWaiting;
|
||||||
|
if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) {
|
||||||
|
nAttempts++;
|
||||||
|
LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
|
||||||
|
+ " last excepiton= {}", nAttempts, waitingTime,
|
||||||
|
ioe.getCause().getClass().getSimpleName());
|
||||||
|
sleep(1000);
|
||||||
|
} else {
|
||||||
|
LOG.error("Roll wal failed and waiting timeout, will not retry", ioe);
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout);
|
||||||
if (regionsToFlush != null) {
|
if (regionsToFlush != null) {
|
||||||
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
|
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
|
||||||
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
|
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
|
||||||
|
|
Loading…
Reference in New Issue