diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 3f20dfdd6ea..45db3bfed7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -58,6 +59,19 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; + /** + * Configure for the timeout of log rolling retry. + */ + protected static final String WAL_ROLL_WAIT_TIMEOUT = + "hbase.regionserver.logroll.wait.timeout.ms"; + + /** + * Configure for the max count of log rolling retry. + * The real retry count is also limited by the timeout of log rolling + * via {@link #WAL_ROLL_WAIT_TIMEOUT} + */ + protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; // Period to roll log. @@ -65,6 +79,10 @@ public abstract class AbstractWALRoller extends Thread private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline private final long checkLowReplicationInterval; + // Wait period for roll log + private final long rollWaitTimeout; + // Max retry for roll log + private final int maxRollRetry; private volatile boolean running = true; @@ -112,6 +130,9 @@ public abstract class AbstractWALRoller extends Thread this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.checkLowReplicationInterval = conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); + this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000); + // retry rolling does not have to be the default behavior, so the default value is 0 here + this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); } /** @@ -182,9 +203,29 @@ public abstract class AbstractWALRoller extends Thread } else { continue; } - // Force the roll if the logroll.period is elapsed or if a roll was requested. - // The returned value is an collection of actual region and family names. - Map> regionsToFlush = controller.rollWal(now); + Map> regionsToFlush = null; + int nAttempts = 0; + long startWaiting = System.currentTimeMillis(); + do { + try { + // Force the roll if the logroll.period is elapsed or if a roll was requested. + // The returned value is an collection of actual region and family names. + regionsToFlush = controller.rollWal(System.currentTimeMillis()); + break; + } catch (IOException ioe) { + long waitingTime = System.currentTimeMillis() - startWaiting; + if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) { + nAttempts++; + LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry," + + " last excepiton= {}", nAttempts, waitingTime, + ioe.getCause().getClass().getSimpleName()); + sleep(1000); + } else { + LOG.error("Roll wal failed and waiting timeout, will not retry", ioe); + throw ioe; + } + } + } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout); if (regionsToFlush != null) { for (Map.Entry> r : regionsToFlush.entrySet()) { scheduleFlush(Bytes.toString(r.getKey()), r.getValue());