HBASE-5544 Add metrics to HRegion.processRow() (Scott Chen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1306648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d5ca12a0f
commit
ada9d8d361
|
@ -46,4 +46,8 @@ public abstract class BaseRowProcessor<T> implements RowProcessor<T> {
|
||||||
return HConstants.DEFAULT_CLUSTER_ID;
|
return HConstants.DEFAULT_CLUSTER_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return this.getClass().getSimpleName().toLowerCase();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4264,6 +4264,9 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
|
public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
final long startNanoTime = System.nanoTime();
|
||||||
|
String metricsName = "rowprocessor." + processor.getName();
|
||||||
|
|
||||||
for (byte[] row : processor.getRowsToLock()) {
|
for (byte[] row : processor.getRowsToLock()) {
|
||||||
checkRow(row, "processRowsWithLocks");
|
checkRow(row, "processRowsWithLocks");
|
||||||
}
|
}
|
||||||
|
@ -4285,12 +4288,21 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
doProcessRowWithTimeout(
|
doProcessRowWithTimeout(
|
||||||
processor, now, this, null, null, timeout);
|
processor, now, this, null, null, timeout);
|
||||||
processor.postProcess(this, walEdit);
|
processor.postProcess(this, walEdit);
|
||||||
|
} catch (IOException e) {
|
||||||
|
long endNanoTime = System.nanoTime();
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".error.nano",
|
||||||
|
endNanoTime - startNanoTime);
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
closeRegionOperation();
|
closeRegionOperation();
|
||||||
}
|
}
|
||||||
|
final long endNanoTime = System.nanoTime();
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".nano",
|
||||||
|
endNanoTime - startNanoTime);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long lockedNanoTime, processDoneNanoTime, unlockedNanoTime = 0;
|
||||||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||||
boolean locked = false;
|
boolean locked = false;
|
||||||
boolean walSyncSuccessful = false;
|
boolean walSyncSuccessful = false;
|
||||||
|
@ -4313,6 +4325,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// 3. Region lock
|
// 3. Region lock
|
||||||
this.updatesLock.readLock().lock();
|
this.updatesLock.readLock().lock();
|
||||||
locked = true;
|
locked = true;
|
||||||
|
lockedNanoTime = System.nanoTime();
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
|
@ -4320,6 +4333,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// waledits
|
// waledits
|
||||||
doProcessRowWithTimeout(
|
doProcessRowWithTimeout(
|
||||||
processor, now, this, mutations, walEdit, timeout);
|
processor, now, this, mutations, walEdit, timeout);
|
||||||
|
processDoneNanoTime = System.nanoTime();
|
||||||
|
|
||||||
if (!mutations.isEmpty()) {
|
if (!mutations.isEmpty()) {
|
||||||
// 5. Get a mvcc write number
|
// 5. Get a mvcc write number
|
||||||
|
@ -4344,6 +4358,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.updatesLock.readLock().unlock();
|
this.updatesLock.readLock().unlock();
|
||||||
locked = false;
|
locked = false;
|
||||||
}
|
}
|
||||||
|
unlockedNanoTime = System.nanoTime();
|
||||||
|
|
||||||
// 9. Release row lock(s)
|
// 9. Release row lock(s)
|
||||||
if (acquiredLocks != null) {
|
if (acquiredLocks != null) {
|
||||||
for (Integer lid : acquiredLocks) {
|
for (Integer lid : acquiredLocks) {
|
||||||
|
@ -4382,11 +4398,18 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
releaseRowLock(lid);
|
releaseRowLock(lid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
unlockedNanoTime = unlockedNanoTime == 0 ?
|
||||||
|
System.nanoTime() : unlockedNanoTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 12. Run post-process hook
|
// 12. Run post-process hook
|
||||||
processor.postProcess(this, walEdit);
|
processor.postProcess(this, walEdit);
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
long endNanoTime = System.nanoTime();
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".error.nano",
|
||||||
|
endNanoTime - startNanoTime);
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
closeRegionOperation();
|
closeRegionOperation();
|
||||||
if (!mutations.isEmpty() &&
|
if (!mutations.isEmpty() &&
|
||||||
|
@ -4394,6 +4417,22 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
requestFlush();
|
requestFlush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Populate all metrics
|
||||||
|
long endNanoTime = System.nanoTime();
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".nano",
|
||||||
|
endNanoTime - startNanoTime);
|
||||||
|
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".acquirelock.nano",
|
||||||
|
lockedNanoTime - startNanoTime);
|
||||||
|
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".process.nano",
|
||||||
|
processDoneNanoTime - lockedNanoTime);
|
||||||
|
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".occupylock.nano",
|
||||||
|
unlockedNanoTime - lockedNanoTime);
|
||||||
|
|
||||||
|
HRegion.incrTimeVaryingMetric(metricsName + ".sync.nano",
|
||||||
|
endNanoTime - unlockedNanoTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
||||||
|
@ -4795,8 +4834,9 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// Request a cache flush. Do it outside update lock.
|
// Request a cache flush. Do it outside update lock.
|
||||||
requestFlush();
|
requestFlush();
|
||||||
}
|
}
|
||||||
if(wrongLength){
|
if (wrongLength) {
|
||||||
throw new IOException("Attempted to increment field that isn't 64 bits wide");
|
throw new IOException(
|
||||||
|
"Attempted to increment field that isn't 64 bits wide");
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -4812,7 +4852,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
throw new NoSuchColumnFamilyException("Column family " +
|
throw new NoSuchColumnFamilyException("Column family " +
|
||||||
Bytes.toString(family) + " does not exist in region " + this
|
Bytes.toString(family) + " does not exist in region " + this
|
||||||
+ " in table " + this.htableDescriptor);
|
+ " in table " + this.htableDescriptor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
|
|
|
@ -64,7 +64,7 @@ public interface RowProcessor<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegion handles the locks and MVCC and invokes this method properly.
|
* HRegion handles the locks and MVCC and invokes this method properly.
|
||||||
*
|
*
|
||||||
* You should override this to create your own RowProcessor.
|
* You should override this to create your own RowProcessor.
|
||||||
*
|
*
|
||||||
* If you are doing read-modify-write here, you should consider using
|
* If you are doing read-modify-write here, you should consider using
|
||||||
|
@ -103,4 +103,9 @@ public interface RowProcessor<T> {
|
||||||
*/
|
*/
|
||||||
UUID getClusterId();
|
UUID getClusterId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Human readable name of the processor
|
||||||
|
* @return The name of the processor
|
||||||
|
*/
|
||||||
|
String getName();
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ public class TestRowProcessorEndpoint {
|
||||||
prepareTestData();
|
prepareTestData();
|
||||||
RowProcessorProtocol protocol =
|
RowProcessorProtocol protocol =
|
||||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
|
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
|
||||||
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
|
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
|
||||||
Set<String> result = protocol.process(processor);
|
Set<String> result = protocol.process(processor);
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ public class TestRowProcessorEndpoint {
|
||||||
private int incrementCounter(HTable table) throws Throwable {
|
private int incrementCounter(HTable table) throws Throwable {
|
||||||
RowProcessorProtocol protocol =
|
RowProcessorProtocol protocol =
|
||||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
RowProcessorEndpoint.IncrementCounterProcessor processor =
|
RowProcessorEndpoint.IncrementCounterProcessor processor =
|
||||||
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
|
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
|
||||||
int counterValue = protocol.process(processor);
|
int counterValue = protocol.process(processor);
|
||||||
return counterValue;
|
return counterValue;
|
||||||
|
@ -234,7 +234,7 @@ public class TestRowProcessorEndpoint {
|
||||||
private void swapRows(HTable table) throws Throwable {
|
private void swapRows(HTable table) throws Throwable {
|
||||||
RowProcessorProtocol protocol =
|
RowProcessorProtocol protocol =
|
||||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
RowProcessorEndpoint.RowSwapProcessor processor =
|
RowProcessorEndpoint.RowSwapProcessor processor =
|
||||||
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
|
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
|
||||||
protocol.process(processor);
|
protocol.process(processor);
|
||||||
}
|
}
|
||||||
|
@ -244,7 +244,7 @@ public class TestRowProcessorEndpoint {
|
||||||
prepareTestData();
|
prepareTestData();
|
||||||
RowProcessorProtocol protocol =
|
RowProcessorProtocol protocol =
|
||||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
RowProcessorEndpoint.TimeoutProcessor processor =
|
RowProcessorEndpoint.TimeoutProcessor processor =
|
||||||
new RowProcessorEndpoint.TimeoutProcessor(ROW);
|
new RowProcessorEndpoint.TimeoutProcessor(ROW);
|
||||||
boolean exceptionCaught = false;
|
boolean exceptionCaught = false;
|
||||||
try {
|
try {
|
||||||
|
@ -510,13 +510,18 @@ public class TestRowProcessorEndpoint {
|
||||||
Bytes.writeByteArray(out, row1);
|
Bytes.writeByteArray(out, row1);
|
||||||
Bytes.writeByteArray(out, row2);
|
Bytes.writeByteArray(out, row2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "swap";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TimeoutProcessor extends
|
public static class TimeoutProcessor extends
|
||||||
BaseRowProcessor<Void> implements Writable {
|
BaseRowProcessor<Void> implements Writable {
|
||||||
|
|
||||||
byte[] row = new byte[0];
|
byte[] row = new byte[0];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Empty constructor for Writable
|
* Empty constructor for Writable
|
||||||
*/
|
*/
|
||||||
|
@ -556,6 +561,11 @@ public class TestRowProcessorEndpoint {
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
Bytes.writeByteArray(out, row);
|
Bytes.writeByteArray(out, row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "timeout";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void doScan(
|
public static void doScan(
|
||||||
|
|
Loading…
Reference in New Issue