diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7cc6a360ef2..0c773044b18 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -511,6 +511,17 @@ public final class HConstants { */ public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000; + /** + * Parameter name for server pause value, used mostly as value to wait before + * running a retry of a failed operation. + */ + public static String HBASE_SERVER_PAUSE = "hbase.server.pause"; + + /** + * Default value of {@link #HBASE_SERVER_PAUSE}. + */ + public static int DEFAULT_HBASE_SERVER_PAUSE = 1000; + /** * Parameter name for maximum retries, used as maximum for all retryable * operations such as fetching of the root region from root region server, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index dfd8f98898a..bace031c669 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -156,6 +156,10 @@ public class HStore implements Store, StoreConfiguration { private final Compactor compactor; + private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; + private static int flush_retries_number; + private static int pauseTime; + /** * Constructor * @param basedir qualified path under which the region directory lives; @@ -223,6 +227,17 @@ public class HStore implements Store, StoreConfiguration { this.compactor = new Compactor(conf); // Create a compaction manager. this.compactionPolicy = new CompactionPolicy(conf, this); + if (HStore.flush_retries_number == 0) { + HStore.flush_retries_number = conf.getInt( + "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); + HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, + HConstants.DEFAULT_HBASE_SERVER_PAUSE); + if (HStore.flush_retries_number <= 0) { + throw new IllegalArgumentException( + "hbase.hstore.flush.retries.number must be > 0, not " + + HStore.flush_retries_number); + } + } } /** @@ -718,8 +733,43 @@ public class HStore implements Store, StoreConfiguration { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. - return internalFlushCache( - snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status); + // Retry after catching exception when flushing, otherwise server will abort + // itself + IOException lastException = null; + for (int i = 0; i < HStore.flush_retries_number; i++) { + try { + Path pathName = internalFlushCache(snapshot, logCacheFlushId, + snapshotTimeRangeTracker, flushedSize, status); + try { + // Path name is null if there is no entry to flush + if (pathName != null) { + validateStoreFile(pathName); + } + return pathName; + } catch (Exception e) { + LOG.warn("Failed validating store file " + pathName + + ", retring num=" + i, e); + if (e instanceof IOException) { + lastException = (IOException) e; + } else { + lastException = new IOException(e); + } + } + } catch (IOException e) { + LOG.warn("Failed flushing store file, retring num=" + i, e); + lastException = e; + } + if (lastException != null) { + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + throw lastException; } /* @@ -841,7 +891,6 @@ public class HStore implements Store, StoreConfiguration { // Write-out finished successfully, move into the right spot String fileName = path.getName(); Path dstPath = new Path(homedir, fileName); - validateStoreFile(path); String msg = "Renaming flushed file at " + path + " to " + dstPath; LOG.debug(msg); status.setStatus("Flushing " + this + ": " + msg);