HBASE-1401 close HLog (and open new one) if there hasnt been edits in N minutes/hours
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@776017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d25c8da619
commit
b4dedf9cbf
|
@ -256,6 +256,8 @@ Release 0.20.0 - Unreleased
|
||||||
HLog#append?)
|
HLog#append?)
|
||||||
HBASE-1429 Allow passing of a configuration object to HTablePool
|
HBASE-1429 Allow passing of a configuration object to HTablePool
|
||||||
HBASE-1432 LuceneDocumentWrapper is not public
|
HBASE-1432 LuceneDocumentWrapper is not public
|
||||||
|
HBASE-1401 close HLog (and open new one) if there hasnt been edits in N
|
||||||
|
minutes/hours
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||||
|
|
|
@ -132,7 +132,6 @@ public class HLog implements HConstants, Syncable {
|
||||||
private final AtomicLong logSeqNum = new AtomicLong(0);
|
private final AtomicLong logSeqNum = new AtomicLong(0);
|
||||||
|
|
||||||
private volatile long filenum = -1;
|
private volatile long filenum = -1;
|
||||||
private volatile long old_filenum = -1;
|
|
||||||
|
|
||||||
private final AtomicInteger numEntries = new AtomicInteger(0);
|
private final AtomicInteger numEntries = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@ -274,6 +273,10 @@ public class HLog implements HConstants, Syncable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public byte [] rollWriter() throws FailedLogCloseException, IOException {
|
public byte [] rollWriter() throws FailedLogCloseException, IOException {
|
||||||
|
// Return if nothing to flush.
|
||||||
|
if (this.writer != null && this.numEntries.get() <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
byte [] regionToFlush = null;
|
byte [] regionToFlush = null;
|
||||||
this.cacheFlushLock.lock();
|
this.cacheFlushLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -283,9 +286,6 @@ public class HLog implements HConstants, Syncable {
|
||||||
synchronized (updateLock) {
|
synchronized (updateLock) {
|
||||||
// Clean up current writer.
|
// Clean up current writer.
|
||||||
Path oldFile = cleanupCurrentWriter(this.filenum);
|
Path oldFile = cleanupCurrentWriter(this.filenum);
|
||||||
if (this.filenum >= 0) {
|
|
||||||
this.old_filenum = this.filenum;
|
|
||||||
}
|
|
||||||
this.filenum = System.currentTimeMillis();
|
this.filenum = System.currentTimeMillis();
|
||||||
Path newPath = computeFilename(this.filenum);
|
Path newPath = computeFilename(this.filenum);
|
||||||
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
|
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
|
||||||
|
|
|
@ -40,28 +40,43 @@ class LogRoller extends Thread implements LogRollListener {
|
||||||
private final ReentrantLock rollLock = new ReentrantLock();
|
private final ReentrantLock rollLock = new ReentrantLock();
|
||||||
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
||||||
private final HRegionServer server;
|
private final HRegionServer server;
|
||||||
|
private volatile long lastrolltime = System.currentTimeMillis();
|
||||||
|
// Period to roll log.
|
||||||
|
private final long rollperiod;
|
||||||
|
|
||||||
/** @param server */
|
/** @param server */
|
||||||
public LogRoller(final HRegionServer server) {
|
public LogRoller(final HRegionServer server) {
|
||||||
super();
|
super();
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
this.rollperiod =
|
||||||
|
this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!server.isStopRequested()) {
|
while (!server.isStopRequested()) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
boolean periodic = false;
|
||||||
if (!rollLog.get()) {
|
if (!rollLog.get()) {
|
||||||
|
periodic = (now - this.lastrolltime) > this.rollperiod;
|
||||||
|
if (!periodic) {
|
||||||
synchronized (rollLog) {
|
synchronized (rollLog) {
|
||||||
try {
|
try {
|
||||||
rollLog.wait(server.threadWakeFrequency);
|
rollLog.wait(server.threadWakeFrequency);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
continue;
|
// Fall through
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
// Time for periodic roll
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
|
||||||
|
}
|
||||||
|
}
|
||||||
rollLock.lock(); // Don't interrupt us. We're working
|
rollLock.lock(); // Don't interrupt us. We're working
|
||||||
try {
|
try {
|
||||||
|
this.lastrolltime = now;
|
||||||
byte [] regionToFlush = server.getLog().rollWriter();
|
byte [] regionToFlush = server.getLog().rollWriter();
|
||||||
if (regionToFlush != null) {
|
if (regionToFlush != null) {
|
||||||
scheduleFlush(regionToFlush);
|
scheduleFlush(regionToFlush);
|
||||||
|
|
Loading…
Reference in New Issue