From 591f7819f30727b15fe678258bfc10c1567ca9b4 Mon Sep 17 00:00:00 2001 From: Xiaolin Ha Date: Mon, 7 Mar 2022 12:20:01 +0800 Subject: [PATCH] HBASE-26552 Introduce retry to logroller to avoid abort (#4038) Signed-off-by: Andrew Purtell --- .../hadoop/hbase/wal/AbstractWALRoller.java | 59 +++++++++++++++---- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 3ad1c5cd17c..fb9f3de91a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -60,6 +60,18 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; + /** + * Configure for the timeout of log rolling retry. + */ + protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; + + /** + * Configure for the max count of log rolling retry. + * The real retry count is also limited by the timeout of log rolling + * via {@link #WAL_ROLL_WAIT_TIMEOUT} + */ + protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; // Period to roll log. @@ -67,6 +79,10 @@ public abstract class AbstractWALRoller extends Thread private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline private final long checkLowReplicationInterval; + // Wait period for roll log + private final long rollWaitTimeout; + // Max retry for roll log + private final int maxRollRetry; private volatile boolean running = true; @@ -114,6 +130,9 @@ public abstract class AbstractWALRoller extends Thread this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.checkLowReplicationInterval = conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); + this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000); + // retry rolling does not have to be the default behavior, so the default value is 0 here + this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); } /** @@ -184,18 +203,38 @@ public abstract class AbstractWALRoller extends Thread } else { continue; } - try { - // Force the roll if the logroll.period is elapsed or if a roll was requested. - // The returned value is an collection of actual region and family names. - Map> regionsToFlush = controller.rollWal(now); - if (regionsToFlush != null) { - for (Map.Entry> r : regionsToFlush.entrySet()) { - scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); + Map> regionsToFlush = null; + int nAttempts = 0; + long startWaiting = EnvironmentEdgeManager.currentTime(); + do { + try { + // Force the roll if the logroll.period is elapsed or if a roll was requested. + // The returned value is an collection of actual region and family names. + regionsToFlush = controller.rollWal(EnvironmentEdgeManager.currentTime()); + break; + } catch (IOException ioe) { + if (ioe instanceof WALClosedException) { + LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", ioe); + iter.remove(); + break; + } + long waitingTime = EnvironmentEdgeManager.currentTime() - startWaiting; + if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) { + nAttempts++; + LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry," + + " last excepiton= {}", nAttempts, waitingTime, + ioe.getCause().getClass().getSimpleName()); + sleep(1000); + } else { + LOG.error("Roll wal failed and waiting timeout, will not retry", ioe); + throw ioe; } } - } catch (WALClosedException e) { - LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); - iter.remove(); + } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout); + if (regionsToFlush != null) { + for (Map.Entry> r : regionsToFlush.entrySet()) { + scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); + } } } } catch (FailedLogCloseException | ConnectException e) {