HBASE-23668 Master log start filling with "Flush journal status" messages
This commit is contained in:
parent
e78ce468d8
commit
1047246717
|
@ -120,6 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
|
||||||
flushThread.start();
|
flushThread.start();
|
||||||
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||||
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
|
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
|
||||||
|
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
|
||||||
|
"compactMin=", flushSize, flushPerChanges, flushIntervalMs, compactMin);
|
||||||
}
|
}
|
||||||
|
|
||||||
// inject our flush related configurations
|
// inject our flush related configurations
|
||||||
|
@ -130,6 +132,8 @@ class RegionFlusherAndCompactor implements Closeable {
|
||||||
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
|
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
|
||||||
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
|
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
|
||||||
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
|
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
|
||||||
|
LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
|
||||||
|
flushPerChanges, flushIntervalMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compact() {
|
private void compact() {
|
||||||
|
@ -180,6 +184,7 @@ class RegionFlusherAndCompactor implements Closeable {
|
||||||
changesAfterLastFlush.set(0);
|
changesAfterLastFlush.set(0);
|
||||||
try {
|
try {
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
|
lastFlushTime = EnvironmentEdgeManager.currentTime();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
|
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
|
||||||
abortable.abort("Failed to flush procedure store region", e);
|
abortable.abort("Failed to flush procedure store region", e);
|
||||||
|
@ -207,8 +212,14 @@ class RegionFlusherAndCompactor implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldFlush(long changes) {
|
private boolean shouldFlush(long changes) {
|
||||||
return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
|
boolean flush = region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
|
||||||
changes > flushPerChanges;
|
changes > flushPerChanges;
|
||||||
|
if (flush && LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("shouldFlush memStoreSize={}, flushSize={}, changes={}, flushPerChanges={}",
|
||||||
|
region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize(), flushSize, changes,
|
||||||
|
flushPerChanges);
|
||||||
|
}
|
||||||
|
return flush;
|
||||||
}
|
}
|
||||||
|
|
||||||
void onUpdate() {
|
void onUpdate() {
|
||||||
|
|
|
@ -306,7 +306,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
||||||
if (!fs.exists(procWALDir)) {
|
if (!fs.exists(procWALDir)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
|
LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
|
||||||
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
|
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
|
||||||
store.start(numThreads);
|
store.start(numThreads);
|
||||||
store.recoverLease();
|
store.recoverLease();
|
||||||
|
@ -347,7 +347,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
|
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
|
||||||
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
|
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
|
||||||
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
|
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
|
||||||
// anyway, let's do a check here.
|
// anyway, let's do a check here.
|
||||||
|
@ -358,12 +358,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
||||||
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
|
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
|
||||||
}
|
}
|
||||||
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
|
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
|
||||||
LOG.warn("The max pid is less than the max pid of all loaded procedures");
|
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
|
||||||
}
|
}
|
||||||
if (!fs.delete(procWALDir, true)) {
|
if (!fs.delete(procWALDir, true)) {
|
||||||
throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
|
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
|
||||||
|
procWALDir);
|
||||||
}
|
}
|
||||||
LOG.info("Migration finished");
|
LOG.info("Migration of WALProcedureStore finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -382,7 +383,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
||||||
if (conf.get(USE_HSYNC_KEY) != null) {
|
if (conf.get(USE_HSYNC_KEY) != null) {
|
||||||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
|
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
|
||||||
}
|
}
|
||||||
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
|
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
|
||||||
|
IntMath.ceilingPowerOfTwo(16 * numThreads));
|
||||||
|
|
||||||
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
|
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
|
||||||
walRoller.start();
|
walRoller.start();
|
||||||
|
|
|
@ -2401,7 +2401,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
flushesQueued.reset();
|
flushesQueued.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
status.markComplete("Flush successful");
|
status.markComplete("Flush successful " + fs.toString());
|
||||||
return fs;
|
return fs;
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (writestate) {
|
synchronized (writestate) {
|
||||||
|
|
Loading…
Reference in New Issue