HBASE-12424 Finer grained logging and metrics for split transactions

This commit is contained in:
Andrew Purtell 2014-11-07 11:31:42 -08:00
parent bbd6815414
commit 7718390703
8 changed files with 193 additions and 30 deletions

View File

@ -112,6 +112,18 @@ public interface MetricsRegionServerSource extends BaseSource {
*/
void incrSlowAppend();
/**
* Update the split transaction time histogram
* @param t time it took, in milliseconds
*/
void updateSplitTime(long t);
/**
* Update the flush time histogram
* @param t time it took, in milliseconds
*/
void updateFlushTime(long t);
// Strings used for exporting to metrics system.
String REGION_COUNT = "regionCount";
String REGION_COUNT_DESC = "Number of regions";
@ -239,4 +251,7 @@ public interface MetricsRegionServerSource extends BaseSource {
String HEDGED_READ_WINS = "hedgedReadWins";
String HEDGED_READ_WINS_DESC =
"The number of times we started a hedged read and a hedged read won";
String SPLIT_KEY = "splitTime";
String FLUSH_KEY = "flushTime";
}

View File

@ -35,8 +35,6 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
public class MetricsRegionServerSourceImpl
extends BaseSourceImpl implements MetricsRegionServerSource {
final MetricsRegionServerWrapper rsWrap;
private final MetricHistogram putHisto;
private final MetricHistogram deleteHisto;
@ -51,6 +49,8 @@ public class MetricsRegionServerSourceImpl
private final MutableCounterLong slowIncrement;
private final MutableCounterLong slowAppend;
private final MetricHistogram splitTimeHisto;
private final MetricHistogram flushTimeHisto;
public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap);
@ -80,6 +80,9 @@ public class MetricsRegionServerSourceImpl
slowAppend = getMetricsRegistry().newCounter(SLOW_APPEND_KEY, SLOW_APPEND_DESC, 0l);
replayHisto = getMetricsRegistry().newHistogram(REPLAY_KEY);
splitTimeHisto = getMetricsRegistry().newHistogram(SPLIT_KEY);
flushTimeHisto = getMetricsRegistry().newHistogram(FLUSH_KEY);
}
@Override
@ -137,6 +140,16 @@ public class MetricsRegionServerSourceImpl
slowAppend.incr();
}
@Override
public void updateSplitTime(long t) {
splitTimeHisto.add(t);
}
@Override
public void updateFlushTime(long t) {
flushTimeHisto.add(t);
}
/**
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
* expectations of java programmers. Instead of returning anything Hadoop metrics expects

View File

@ -422,18 +422,30 @@ class MemStoreFlusher implements FlushRequester {
* not flushed.
*/
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
long startTime = 0;
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null) {
startTime = fqe.createTime;
}
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
if (startTime == 0) {
// Avoid getting the system time unless we don't have a FlushRegionEntry;
// shame we can't capture the time also spent in the above synchronized
// block
startTime = EnvironmentEdgeManager.currentTime();
}
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
boolean shouldCompact = region.flushcache().isCompactionNeeded();
HRegion.FlushResult flushResult = region.flushcache();
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
@ -442,7 +454,10 @@ class MemStoreFlusher implements FlushRequester {
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
server.metricsRegionServer.updateFlushTime(endTime - startTime);
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog

View File

@ -94,4 +94,12 @@ public class MetricsRegionServer {
public void updateReplay(long t){
serverSource.updateReplay(t);
}
public void updateSplitTime(long t) {
serverSource.updateSplitTime(t);
}
public void updateFlushTime(long t) {
serverSource.updateFlushTime(t);
}
}

View File

@ -1080,7 +1080,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
boolean result = region.flushcache().isCompactionNeeded();
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = region.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
boolean result = flushResult.isCompactionNeeded();
if (result) {
regionServer.compactSplitThread.requestSystemCompaction(region,
"Compaction through user triggered flush");
@ -1210,8 +1216,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionB.startRegionOperation(Operation.MERGE_REGION);
LOG.info("Receiving merging request for " + regionA + ", " + regionB
+ ",forcible=" + forcible);
regionA.flushcache();
regionB.flushcache();
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = regionA.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
startTime = EnvironmentEdgeManager.currentTime();
flushResult = regionB.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
return MergeRegionsResponse.newBuilder().build();
} catch (IOException ie) {
@ -1550,7 +1566,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegion region = getRegion(request.getRegion());
region.startRegionOperation(Operation.SPLIT_REGION);
LOG.info("Splitting " + region.getRegionNameAsString());
region.flushcache();
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = region.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
byte[] splitPoint = null;
if (request.hasSplitPoint()) {
splitPoint = request.getSplitPoint().toByteArray();

View File

@ -567,6 +567,7 @@ public class RegionCoprocessorHost
* Invoked just before a split
* @throws IOException
*/
// TODO: Deprecate this
public void preSplit() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
@ -60,10 +61,10 @@ class SplitRequest implements Runnable {
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
boolean success = false;
long startTime = EnvironmentEdgeManager.currentTime();
SplitTransaction st = new SplitTransaction(parent, midKey);
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
//acquire a shared read lock on the table, so that table schema modifications
//do not happen concurrently
tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()
@ -80,6 +81,7 @@ class SplitRequest implements Runnable {
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
success = true;
} catch (Exception e) {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.info(
@ -106,11 +108,6 @@ class SplitRequest implements Runnable {
}
return;
}
LOG.info("Region split, hbase:meta updated, and report to master. Parent="
+ parent.getRegionNameAsString() + ", new regions: "
+ st.getFirstDaughter().getRegionNameAsString() + ", "
+ st.getSecondDaughter().getRegionNameAsString() + ". Split took "
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} catch (IOException ex) {
ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
LOG.error("Split failed " + this, ex);
@ -128,6 +125,19 @@ class SplitRequest implements Runnable {
parent.clearSplit();
}
releaseTableLock();
long endTime = EnvironmentEdgeManager.currentTime();
// Update regionserver metrics with the split transaction total running time
server.metricsRegionServer.updateSplitTime(endTime - startTime);
if (success) {
// Log success
LOG.info("Region split, hbase:meta updated, and report to master. Parent="
+ parent.getRegionNameAsString() + ", new regions: "
+ st.getFirstDaughter().getRegionNameAsString() + ", "
+ st.getSecondDaughter().getRegionNameAsString() + ". Split took "
+ StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTime(), startTime));
}
// Always log the split transaction journal
LOG.info("Split transaction journal:\n\t" + StringUtils.join("\n\t", st.getJournal()));
}
}

View File

@ -96,7 +96,23 @@ public class SplitTransaction {
* Each enum is a step in the split transaction. Used to figure how much
* we need to rollback.
*/
enum JournalEntry {
static enum JournalEntryType {
/**
* Started
*/
STARTED,
/**
* Prepared (after table lock)
*/
PREPARED,
/**
* Before preSplit coprocessor hook
*/
BEFORE_PRE_SPLIT_HOOK,
/**
* After preSplit coprocessor hook
*/
AFTER_PRE_SPLIT_HOOK,
/**
* Set region as in transition, set it into SPLITTING state.
*/
@ -121,6 +137,22 @@ public class SplitTransaction {
* Started in on the creation of the second daughter region.
*/
STARTED_REGION_B_CREATION,
/**
* Opened the first daughter region
*/
OPENED_REGION_A,
/**
* Opened the second daughter region
*/
OPENED_REGION_B,
/**
* Before postSplit coprocessor hook
*/
BEFORE_POST_SPLIT_HOOK,
/**
* After postSplit coprocessor hook
*/
AFTER_POST_SPLIT_HOOK,
/**
* Point of no return.
* If we got here, then transaction is not recoverable other than by
@ -129,6 +161,29 @@ public class SplitTransaction {
PONR
}
static class JournalEntry {
public JournalEntryType type;
public long timestamp;
public JournalEntry(JournalEntryType type) {
this(type, EnvironmentEdgeManager.currentTime());
}
public JournalEntry(JournalEntryType type, long timestamp) {
this.type = type;
this.timestamp = timestamp;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(type);
sb.append(" at ");
sb.append(timestamp);
return sb.toString();
}
}
/*
* Journal of how far the split transaction has progressed.
*/
@ -142,6 +197,7 @@ public class SplitTransaction {
public SplitTransaction(final HRegion r, final byte [] splitrow) {
this.parent = r;
this.splitrow = splitrow;
this.journal.add(new JournalEntry(JournalEntryType.STARTED));
}
/**
@ -167,6 +223,7 @@ public class SplitTransaction {
long rid = getDaughterRegionIdTimestamp(hri);
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
return true;
}
@ -209,16 +266,17 @@ public class SplitTransaction {
assert !this.parent.lock.writeLock().isHeldByCurrentThread():
"Unsafe to hold write lock while performing RPCs";
// Coprocessor callback
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().preSplit();
}
journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK));
// Coprocessor callback
if (this.parent.getCoprocessorHost() != null) {
// TODO: Remove one of these
this.parent.getCoprocessorHost().preSplit();
this.parent.getCoprocessorHost().preSplit(this.splitrow);
}
journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK));
// If true, no cluster to write meta edits to or to update znodes in.
boolean testing = server == null? true:
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
@ -261,7 +319,7 @@ public class SplitTransaction {
// OfflineParentInMeta timeout,this will cause regionserver exit,and then
// master ServerShutdownHandler will fix daughter & avoid data loss. (See
// HBase-4562).
this.journal.add(JournalEntry.PONR);
this.journal.add(new JournalEntry(JournalEntryType.PONR));
// Edit parent in meta. Offlines parent region and adds splita and splitb
// as an atomic update. See HBASE-7721. This update to META makes the region
@ -284,10 +342,10 @@ public class SplitTransaction {
throw new IOException("Failed to get ok from master to split "
+ parent.getRegionNameAsString());
}
this.journal.add(JournalEntry.SET_SPLITTING);
this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING));
this.parent.getRegionFileSystem().createSplitsDir();
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
Exception exceptionToThrow = null;
@ -305,7 +363,7 @@ public class SplitTransaction {
exceptionToThrow = closedByOtherException;
}
if (exceptionToThrow != closedByOtherException) {
this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION));
}
if (exceptionToThrow != null) {
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
@ -314,7 +372,7 @@ public class SplitTransaction {
if (!testing) {
services.removeFromOnlineRegions(this.parent, null);
}
this.journal.add(JournalEntry.OFFLINED_PARENT);
this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
// TODO: If splitStoreFiles were multithreaded would we complete steps in
// less elapsed time? St.Ack 20100920
@ -328,11 +386,11 @@ public class SplitTransaction {
// region. We could fail halfway through. If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
return new PairOfSameType<HRegion>(a, b);
}
@ -366,7 +424,13 @@ public class SplitTransaction {
bOpener.start();
try {
aOpener.join();
if (aOpener.getException() == null) {
journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A));
}
bOpener.join();
if (bOpener.getException() == null) {
journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B));
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
@ -415,10 +479,12 @@ public class SplitTransaction {
final RegionServerServices services, PairOfSameType<HRegion> regions)
throws IOException {
openDaughters(server, services, regions.getFirst(), regions.getSecond());
journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK));
// Coprocessor callback
if (parent.getCoprocessorHost() != null) {
parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
}
journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK));
return regions;
}
@ -495,7 +561,7 @@ public class SplitTransaction {
@Override
public boolean progress() {
long now = System.currentTimeMillis();
long now = EnvironmentEdgeManager.currentTime();
if (now - lastLog > this.interval) {
LOG.info("Opening " + this.hri.getRegionNameAsString());
this.lastLog = now;
@ -616,7 +682,7 @@ public class SplitTransaction {
// Iterate in reverse.
while (iterator.hasPrevious()) {
JournalEntry je = iterator.previous();
switch(je) {
switch(je.type) {
case SET_SPLITTING:
if (services != null
@ -665,6 +731,17 @@ public class SplitTransaction {
// See HBASE-3872.
return false;
// Informational only cases
case STARTED:
case PREPARED:
case BEFORE_PRE_SPLIT_HOOK:
case AFTER_PRE_SPLIT_HOOK:
case BEFORE_POST_SPLIT_HOOK:
case AFTER_POST_SPLIT_HOOK:
case OPENED_REGION_A:
case OPENED_REGION_B:
break;
default:
throw new RuntimeException("Unhandled journal entry: " + je);
}
@ -684,4 +761,7 @@ public class SplitTransaction {
return hri_b;
}
List<JournalEntry> getJournal() {
return journal;
}
}