HBASE-7507 Make memstore flush be able to retry after exception (Chunhui)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1436111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a611da16b3
commit
6298623a2f
|
@ -511,6 +511,17 @@ public final class HConstants {
|
||||||
*/
|
*/
|
||||||
public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
|
public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameter name for server pause value, used mostly as value to wait before
|
||||||
|
* running a retry of a failed operation.
|
||||||
|
*/
|
||||||
|
public static String HBASE_SERVER_PAUSE = "hbase.server.pause";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default value of {@link #HBASE_SERVER_PAUSE}.
|
||||||
|
*/
|
||||||
|
public static int DEFAULT_HBASE_SERVER_PAUSE = 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parameter name for maximum retries, used as maximum for all retryable
|
* Parameter name for maximum retries, used as maximum for all retryable
|
||||||
* operations such as fetching of the root region from root region server,
|
* operations such as fetching of the root region from root region server,
|
||||||
|
|
|
@ -156,6 +156,10 @@ public class HStore implements Store, StoreConfiguration {
|
||||||
|
|
||||||
private final Compactor compactor;
|
private final Compactor compactor;
|
||||||
|
|
||||||
|
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
|
||||||
|
private static int flush_retries_number;
|
||||||
|
private static int pauseTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param basedir qualified path under which the region directory lives;
|
* @param basedir qualified path under which the region directory lives;
|
||||||
|
@ -223,6 +227,17 @@ public class HStore implements Store, StoreConfiguration {
|
||||||
this.compactor = new Compactor(conf);
|
this.compactor = new Compactor(conf);
|
||||||
// Create a compaction manager.
|
// Create a compaction manager.
|
||||||
this.compactionPolicy = new CompactionPolicy(conf, this);
|
this.compactionPolicy = new CompactionPolicy(conf, this);
|
||||||
|
if (HStore.flush_retries_number == 0) {
|
||||||
|
HStore.flush_retries_number = conf.getInt(
|
||||||
|
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
|
||||||
|
HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
|
||||||
|
HConstants.DEFAULT_HBASE_SERVER_PAUSE);
|
||||||
|
if (HStore.flush_retries_number <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"hbase.hstore.flush.retries.number must be > 0, not "
|
||||||
|
+ HStore.flush_retries_number);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -718,8 +733,43 @@ public class HStore implements Store, StoreConfiguration {
|
||||||
// If an exception happens flushing, we let it out without clearing
|
// If an exception happens flushing, we let it out without clearing
|
||||||
// the memstore snapshot. The old snapshot will be returned when we say
|
// the memstore snapshot. The old snapshot will be returned when we say
|
||||||
// 'snapshot', the next time flush comes around.
|
// 'snapshot', the next time flush comes around.
|
||||||
return internalFlushCache(
|
// Retry after catching exception when flushing, otherwise server will abort
|
||||||
snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
|
// itself
|
||||||
|
IOException lastException = null;
|
||||||
|
for (int i = 0; i < HStore.flush_retries_number; i++) {
|
||||||
|
try {
|
||||||
|
Path pathName = internalFlushCache(snapshot, logCacheFlushId,
|
||||||
|
snapshotTimeRangeTracker, flushedSize, status);
|
||||||
|
try {
|
||||||
|
// Path name is null if there is no entry to flush
|
||||||
|
if (pathName != null) {
|
||||||
|
validateStoreFile(pathName);
|
||||||
|
}
|
||||||
|
return pathName;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed validating store file " + pathName
|
||||||
|
+ ", retring num=" + i, e);
|
||||||
|
if (e instanceof IOException) {
|
||||||
|
lastException = (IOException) e;
|
||||||
|
} else {
|
||||||
|
lastException = new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed flushing store file, retring num=" + i, e);
|
||||||
|
lastException = e;
|
||||||
|
}
|
||||||
|
if (lastException != null) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(pauseTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
IOException iie = new InterruptedIOException();
|
||||||
|
iie.initCause(e);
|
||||||
|
throw iie;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw lastException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -841,7 +891,6 @@ public class HStore implements Store, StoreConfiguration {
|
||||||
// Write-out finished successfully, move into the right spot
|
// Write-out finished successfully, move into the right spot
|
||||||
String fileName = path.getName();
|
String fileName = path.getName();
|
||||||
Path dstPath = new Path(homedir, fileName);
|
Path dstPath = new Path(homedir, fileName);
|
||||||
validateStoreFile(path);
|
|
||||||
String msg = "Renaming flushed file at " + path + " to " + dstPath;
|
String msg = "Renaming flushed file at " + path + " to " + dstPath;
|
||||||
LOG.debug(msg);
|
LOG.debug(msg);
|
||||||
status.setStatus("Flushing " + this + ": " + msg);
|
status.setStatus("Flushing " + this + ": " + msg);
|
||||||
|
|
Loading…
Reference in New Issue