HBASE-10324 refactor deferred-log-flush/Durability related interface/code/naming to align with changed semantic of the new write thread model
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1557939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
185aec9c24
commit
5f9ef02234
|
@ -597,36 +597,31 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if deferred log edits are enabled on the table.
|
||||
* Check if async log edits are enabled on the table.
|
||||
*
|
||||
* @return true if that deferred log flush is enabled on the table
|
||||
* @return true if that async log flush is enabled on the table
|
||||
*
|
||||
* @see #setDeferredLogFlush(boolean)
|
||||
* @deprecated use {@link #getDurability()}
|
||||
* @see #setAsyncLogFlush(boolean)
|
||||
*/
|
||||
@Deprecated
|
||||
public synchronized boolean isDeferredLogFlush() {
|
||||
public synchronized boolean isAsyncLogFlush() {
|
||||
return getDurability() == Durability.ASYNC_WAL;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used to defer the log edits syncing to the file system. Everytime
|
||||
* This is used to allowing the log edits syncing to the file system. Everytime
|
||||
* an edit is sent to the server it is first sync'd to the file system by the
|
||||
* log writer. This sync is an expensive operation and thus can be deferred so
|
||||
* that the edits are kept in memory for a specified period of time as represented
|
||||
* by <code> hbase.regionserver.optionallogflushinterval </code> and not flushed
|
||||
* for every edit.
|
||||
* that the edits are kept in memory until the background async writer-sync-notifier
|
||||
* threads do the sync and not explicitly flushed for every edit.
|
||||
* <p>
|
||||
* NOTE:- This option might result in data loss if the region server crashes
|
||||
* before these deferred edits in memory are flushed onto the filesystem.
|
||||
* before these pending edits in memory are flushed onto the filesystem.
|
||||
* </p>
|
||||
*
|
||||
* @param isDeferredLogFlush
|
||||
* @deprecated use {@link #setDurability(Durability)}
|
||||
* @param isAsyncLogFlush
|
||||
*/
|
||||
@Deprecated
|
||||
public synchronized void setDeferredLogFlush(final boolean isDeferredLogFlush) {
|
||||
this.setDurability(isDeferredLogFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
|
||||
public synchronized void setAsyncLogFlush(final boolean isAsyncLogFlush) {
|
||||
this.setDurability(isAsyncLogFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -424,7 +424,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
private final MetricsRegion metricsRegion;
|
||||
private final MetricsRegionWrapperImpl metricsRegionWrapper;
|
||||
private final boolean deferredLogSyncDisabled;
|
||||
private final Durability durability;
|
||||
|
||||
/**
|
||||
|
@ -538,9 +537,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
this.rowProcessorTimeout = conf.getLong(
|
||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
|
||||
this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
|
||||
1 * 1000) <= 0;
|
||||
this.durability = htd.getDurability() == Durability.USE_DEFAULT
|
||||
? DEFAULT_DURABLITY
|
||||
: htd.getDurability();
|
||||
|
@ -5288,7 +5284,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
ClassSize.ARRAY +
|
||||
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(12 * Bytes.SIZEOF_LONG) +
|
||||
5 * Bytes.SIZEOF_BOOLEAN);
|
||||
4 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
// woefully out of date - currently missing:
|
||||
// 1 x HashMap - coprocessorServiceHandlers
|
||||
|
@ -5770,10 +5766,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// nothing do to
|
||||
break;
|
||||
case ASYNC_WAL:
|
||||
// defer the sync, unless we globally can't
|
||||
if (this.deferredLogSyncDisabled) {
|
||||
this.log.sync(txid);
|
||||
}
|
||||
// nothing do to
|
||||
break;
|
||||
case SYNC_WAL:
|
||||
case FSYNC_WAL:
|
||||
|
@ -5788,8 +5781,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Check whether we should sync the log from the table's durability settings
|
||||
*/
|
||||
private boolean shouldSyncLog() {
|
||||
return this.deferredLogSyncDisabled ||
|
||||
durability.ordinal() > Durability.ASYNC_WAL.ordinal();
|
||||
return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -122,7 +122,7 @@ class FSHLog implements HLog, Syncable {
|
|||
private final String prefix;
|
||||
private final AtomicLong unflushedEntries = new AtomicLong(0);
|
||||
private final AtomicLong syncedTillHere = new AtomicLong(0);
|
||||
private long lastDeferredTxid;
|
||||
private long lastUnSyncedTxid;
|
||||
private final Path oldLogDir;
|
||||
|
||||
// all writes pending on AsyncWriter/AsyncSyncer thread with
|
||||
|
@ -766,10 +766,10 @@ class FSHLog implements HLog, Syncable {
|
|||
} catch (IOException e) {
|
||||
LOG.error("Failed close of HLog writer", e);
|
||||
int errors = closeErrorCount.incrementAndGet();
|
||||
if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
|
||||
if (errors <= closeErrorsTolerated && !hasUnSyncedEntries()) {
|
||||
LOG.warn("Riding over HLog close failure! error count="+errors);
|
||||
} else {
|
||||
if (hasDeferredEntries()) {
|
||||
if (hasUnSyncedEntries()) {
|
||||
LOG.error("Aborting due to unflushed edits in HLog");
|
||||
}
|
||||
// Failed close of log file. Means we're losing edits. For now,
|
||||
|
@ -1027,8 +1027,8 @@ class FSHLog implements HLog, Syncable {
|
|||
this.numEntries.incrementAndGet();
|
||||
this.asyncWriter.setPendingTxid(txid);
|
||||
|
||||
if (htd.isDeferredLogFlush()) {
|
||||
lastDeferredTxid = txid;
|
||||
if (htd.isAsyncLogFlush()) {
|
||||
lastUnSyncedTxid = txid;
|
||||
}
|
||||
this.latestSequenceNums.put(encodedRegionName, seqNum);
|
||||
}
|
||||
|
@ -1038,7 +1038,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// deferred log flushing
|
||||
if (doSync &&
|
||||
(info.isMetaRegion() ||
|
||||
!htd.isDeferredLogFlush())) {
|
||||
!htd.isAsyncLogFlush())) {
|
||||
// sync txn to file system
|
||||
this.sync(txid);
|
||||
}
|
||||
|
@ -1647,8 +1647,8 @@ class FSHLog implements HLog, Syncable {
|
|||
}
|
||||
|
||||
/** Provide access to currently deferred sequence num for tests */
|
||||
boolean hasDeferredEntries() {
|
||||
return this.lastDeferredTxid > this.syncedTillHere.get();
|
||||
boolean hasUnSyncedEntries() {
|
||||
return this.lastUnSyncedTxid > this.syncedTillHere.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -182,9 +182,6 @@ public class TestFSErrorsExposed {
|
|||
try {
|
||||
// We set it not to run or it will trigger server shutdown while sync'ing
|
||||
// because all the datanodes are bad
|
||||
util.getConfiguration().setInt(
|
||||
"hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
|
||||
|
||||
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||
|
||||
util.startMiniCluster(1);
|
||||
|
|
|
@ -3851,8 +3851,6 @@ public class TestHRegion {
|
|||
durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
|
||||
|
||||
// expected cases for async wal
|
||||
// do not sync for deferred flush with large optionallogflushinterval
|
||||
conf.setLong("hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
|
||||
durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
|
||||
durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
|
||||
durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
|
||||
|
@ -3860,8 +3858,6 @@ public class TestHRegion {
|
|||
durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
|
||||
durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
|
||||
|
||||
// now small deferred log flush optionallogflushinterval, expect sync
|
||||
conf.setLong("hbase.regionserver.optionallogflushinterval", 5);
|
||||
durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
|
||||
durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
|
||||
durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
|
||||
|
|
|
@ -62,7 +62,6 @@ public class TestDurability {
|
|||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
CONF.setLong("hbase.regionserver.optionallogflushinterval", 500*1000);
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
|
||||
CLUSTER = TEST_UTIL.getDFSCluster();
|
||||
|
@ -208,10 +207,11 @@ public class TestDurability {
|
|||
}
|
||||
|
||||
// lifted from TestAtomicOperation
|
||||
private HRegion createHRegion (byte [] tableName, String callingMethod, HLog log, boolean isDeferredLogFlush)
|
||||
private HRegion createHRegion (byte [] tableName, String callingMethod,
|
||||
HLog log, boolean isAsyncLogFlush)
|
||||
throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
|
||||
htd.setDeferredLogFlush(isDeferredLogFlush);
|
||||
htd.setAsyncLogFlush(isAsyncLogFlush);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
|
||||
htd.addFamily(hcd);
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
|
|
|
@ -1074,10 +1074,6 @@ public class TestHLogSplit {
|
|||
@Test (timeout=300000)
|
||||
@Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this")
|
||||
public void testLogRollAfterSplitStart() throws IOException {
|
||||
// set flush interval to a large number so it doesn't interrupt us
|
||||
final String F_INTERVAL = "hbase.regionserver.optionallogflushinterval";
|
||||
long oldFlushInterval = conf.getLong(F_INTERVAL, 1000);
|
||||
conf.setLong(F_INTERVAL, 1000*1000*100);
|
||||
HLog log = null;
|
||||
String logName = "testLogRollAfterSplitStart";
|
||||
Path thisTestsDir = new Path(HBASEDIR, logName);
|
||||
|
@ -1127,7 +1123,6 @@ public class TestHLogSplit {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
conf.setLong(F_INTERVAL, oldFlushInterval);
|
||||
if (log != null) {
|
||||
log.close();
|
||||
}
|
||||
|
|
|
@ -87,9 +87,6 @@ public class TestLogRollAbort {
|
|||
// the namenode might still try to choose the recently-dead datanode
|
||||
// for a pipeline, so try to a new pipeline multiple times
|
||||
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
|
||||
// set periodic sync to 2 min so it doesn't run during test
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.optionallogflushinterval",
|
||||
120 * 1000);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -125,7 +122,7 @@ public class TestLogRollAbort {
|
|||
String tableName = this.getClass().getSimpleName();
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
desc.setDeferredLogFlush(true);
|
||||
desc.setAsyncLogFlush(true);
|
||||
|
||||
admin.createTable(desc);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
|
@ -155,7 +152,7 @@ public class TestLogRollAbort {
|
|||
log.rollWriter(true);
|
||||
} catch (FailedLogCloseException flce) {
|
||||
assertTrue("Should have deferred flush log edits outstanding",
|
||||
((FSHLog) log).hasDeferredEntries());
|
||||
((FSHLog) log).hasUnSyncedEntries());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -262,7 +262,7 @@ module Hbase
|
|||
htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
|
||||
htd.setCompactionEnabled(JBoolean.valueOf(arg[COMPACTION_ENABLED])) if arg[COMPACTION_ENABLED]
|
||||
htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
|
||||
htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
|
||||
htd.setAsyncLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
|
||||
htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY]
|
||||
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
|
||||
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
|
||||
|
@ -472,7 +472,7 @@ module Hbase
|
|||
htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
|
||||
htd.setCompactionEnabled(JBoolean.valueOf(arg[COMPACTION_ENABLED])) if arg[COMPACTION_ENABLED]
|
||||
htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
|
||||
htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
|
||||
htd.setAsyncLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
|
||||
htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY]
|
||||
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
|
||||
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
|
||||
|
|
Loading…
Reference in New Issue