HBASE-7869 Provide way to not start LogSyncer thread (Anoop)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1451427 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9051c55542
commit
26d688ff95
|
@ -380,6 +380,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
private final MetricsRegion metricsRegion;
|
private final MetricsRegion metricsRegion;
|
||||||
private final MetricsRegionWrapperImpl metricsRegionWrapper;
|
private final MetricsRegionWrapperImpl metricsRegionWrapper;
|
||||||
|
private final boolean deferredLogSyncDisabled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegion constructor. This constructor should only be used for testing and
|
* HRegion constructor. This constructor should only be used for testing and
|
||||||
|
@ -484,7 +485,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
*/
|
*/
|
||||||
this.rowProcessorTimeout = conf.getLong(
|
this.rowProcessorTimeout = conf.getLong(
|
||||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||||
|
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
|
||||||
|
this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
|
||||||
|
1 * 1000) <= 0;
|
||||||
|
|
||||||
if (rsServices != null) {
|
if (rsServices != null) {
|
||||||
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
||||||
// don't initialize coprocessors if not running within a regionserver
|
// don't initialize coprocessors if not running within a regionserver
|
||||||
|
@ -5381,7 +5385,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
*/
|
*/
|
||||||
private void syncOrDefer(long txid) throws IOException {
|
private void syncOrDefer(long txid) throws IOException {
|
||||||
if (this.getRegionInfo().isMetaRegion() ||
|
if (this.getRegionInfo().isMetaRegion() ||
|
||||||
!this.htableDescriptor.isDeferredLogFlush()) {
|
!this.htableDescriptor.isDeferredLogFlush() || this.deferredLogSyncDisabled) {
|
||||||
this.log.sync(txid);
|
this.log.sync(txid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,7 +220,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
/**
|
/**
|
||||||
* Thread that handles optional sync'ing
|
* Thread that handles optional sync'ing
|
||||||
*/
|
*/
|
||||||
private final LogSyncer logSyncerThread;
|
private final LogSyncer logSyncer;
|
||||||
|
|
||||||
/** Number of log close errors tolerated before we abort */
|
/** Number of log close errors tolerated before we abort */
|
||||||
private final int closeErrorsTolerated;
|
private final int closeErrorsTolerated;
|
||||||
|
@ -348,7 +348,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
this.closeErrorsTolerated = conf.getInt(
|
this.closeErrorsTolerated = conf.getInt(
|
||||||
"hbase.regionserver.logroll.errors.tolerated", 0);
|
"hbase.regionserver.logroll.errors.tolerated", 0);
|
||||||
|
|
||||||
this.logSyncerThread = new LogSyncer(this.optionalFlushInterval);
|
this.logSyncer = new LogSyncer(this.optionalFlushInterval);
|
||||||
|
|
||||||
LOG.info("HLog configuration: blocksize=" +
|
LOG.info("HLog configuration: blocksize=" +
|
||||||
StringUtils.byteDesc(this.blocksize) +
|
StringUtils.byteDesc(this.blocksize) +
|
||||||
|
@ -378,8 +378,15 @@ class FSHLog implements HLog, Syncable {
|
||||||
// handle the reflection necessary to call getNumCurrentReplicas()
|
// handle the reflection necessary to call getNumCurrentReplicas()
|
||||||
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
|
// When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
|
||||||
Thread.currentThread().getName() + ".logSyncer");
|
if (this.optionalFlushInterval > 0) {
|
||||||
|
Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
|
||||||
|
+ ".logSyncer");
|
||||||
|
} else {
|
||||||
|
LOG.info("hbase.regionserver.optionallogflushinterval is set as "
|
||||||
|
+ this.optionalFlushInterval + ". Deferred log syncing won't work. "
|
||||||
|
+ "Any Mutation, marked to be deferred synced, will be flushed immediately.");
|
||||||
|
}
|
||||||
coprocessorHost = new WALCoprocessorHost(this, conf);
|
coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||||
|
|
||||||
this.metrics = new MetricsWAL();
|
this.metrics = new MetricsWAL();
|
||||||
|
@ -797,13 +804,16 @@ class FSHLog implements HLog, Syncable {
|
||||||
if (this.closed) {
|
if (this.closed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
// When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
|
||||||
logSyncerThread.close();
|
if (this.optionalFlushInterval > 0) {
|
||||||
// Make sure we synced everything
|
try {
|
||||||
logSyncerThread.join(this.optionalFlushInterval*2);
|
logSyncer.close();
|
||||||
} catch (InterruptedException e) {
|
// Make sure we synced everything
|
||||||
LOG.error("Exception while waiting for syncer thread to die", e);
|
logSyncer.join(this.optionalFlushInterval * 2);
|
||||||
Thread.currentThread().interrupt();
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Exception while waiting for syncer thread to die", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// Prevent all further flushing and rolling.
|
// Prevent all further flushing and rolling.
|
||||||
|
@ -1088,9 +1098,9 @@ class FSHLog implements HLog, Syncable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
doneUpto = this.unflushedEntries.get();
|
doneUpto = this.unflushedEntries.get();
|
||||||
pending = logSyncerThread.getPendingWrites();
|
pending = logSyncer.getPendingWrites();
|
||||||
try {
|
try {
|
||||||
logSyncerThread.hlogFlush(tempWriter, pending);
|
logSyncer.hlogFlush(tempWriter, pending);
|
||||||
} catch(IOException io) {
|
} catch(IOException io) {
|
||||||
ioe = io;
|
ioe = io;
|
||||||
LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
|
LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
|
||||||
|
@ -1101,7 +1111,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
synchronized (flushLock) {
|
synchronized (flushLock) {
|
||||||
// HBASE-4387, HBASE-5623, retry with updateLock held
|
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||||
tempWriter = this.writer;
|
tempWriter = this.writer;
|
||||||
logSyncerThread.hlogFlush(tempWriter, pending);
|
logSyncer.hlogFlush(tempWriter, pending);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1256,7 +1266,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
// coprocessor hook:
|
// coprocessor hook:
|
||||||
if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
|
if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
|
||||||
// write to our buffer for the Hlog file.
|
// write to our buffer for the Hlog file.
|
||||||
logSyncerThread.append(new FSHLog.Entry(logKey, logEdit));
|
logSyncer.append(new FSHLog.Entry(logKey, logEdit));
|
||||||
}
|
}
|
||||||
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
|
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
|
||||||
coprocessorHost.postWALWrite(info, logKey, logEdit);
|
coprocessorHost.postWALWrite(info, logKey, logEdit);
|
||||||
|
|
Loading…
Reference in New Issue