HBASE-25455 Add trace support for HRegion read/write operation (#2861)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
ae2c62ffaa
commit
03e12bfa4a
|
@ -103,8 +103,7 @@ class AsyncRegionLocator {
|
|||
CompletableFuture<T> future = action.get();
|
||||
FutureUtils.addListener(future, (resp, error) -> {
|
||||
if (error != null) {
|
||||
span.recordException(error);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
TraceUtil.setError(span, error);
|
||||
} else {
|
||||
List<String> regionNames = getRegionNames.apply(resp);
|
||||
if (!regionNames.isEmpty()) {
|
||||
|
|
|
@ -424,8 +424,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
onCallFinished(call, hrc, addr, callback);
|
||||
} finally {
|
||||
if (hrc.failed()) {
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
span.recordException(hrc.getFailed());
|
||||
TraceUtil.setError(span, hrc.getFailed());
|
||||
} else {
|
||||
span.setStatus(StatusCode.OK);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.opentelemetry.api.trace.Tracer;
|
|||
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -58,6 +59,9 @@ public final class TraceUtil {
|
|||
|
||||
public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;
|
||||
|
||||
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
|
||||
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
|
||||
|
||||
private TraceUtil() {
|
||||
}
|
||||
|
||||
|
@ -139,14 +143,18 @@ public final class TraceUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static void setError(Span span, Throwable error) {
|
||||
span.recordException(error);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish the {@code span} when the given {@code future} is completed.
|
||||
*/
|
||||
private static void endSpan(CompletableFuture<?> future, Span span) {
|
||||
FutureUtils.addListener(future, (resp, error) -> {
|
||||
if (error != null) {
|
||||
span.recordException(error);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
setError(span, error);
|
||||
} else {
|
||||
span.setStatus(StatusCode.OK);
|
||||
}
|
||||
|
@ -164,8 +172,32 @@ public final class TraceUtil {
|
|||
action.run();
|
||||
span.setStatus(StatusCode.OK);
|
||||
} catch (Throwable e) {
|
||||
span.recordException(e);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
setError(span, e);
|
||||
throw e;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface IOExceptionCallable<V> {
|
||||
V call() throws IOException;
|
||||
}
|
||||
|
||||
public static <T> T trace(IOExceptionCallable<T> callable, String spanName) throws IOException {
|
||||
return trace(callable, () -> createSpan(spanName));
|
||||
}
|
||||
|
||||
public static <T> T trace(IOExceptionCallable<T> callable, Supplier<Span> creator)
|
||||
throws IOException {
|
||||
Span span = creator.get();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
T ret = callable.call();
|
||||
span.setStatus(StatusCode.OK);
|
||||
return ret;
|
||||
} catch (Throwable e) {
|
||||
setError(span, e);
|
||||
throw e;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
|
|
|
@ -135,12 +135,10 @@ public class CallRunner {
|
|||
resultPair = this.rpcServer.call(call, this.status);
|
||||
} catch (TimeoutIOException e){
|
||||
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
|
||||
span.recordException(e);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
TraceUtil.setError(span, e);
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
span.recordException(e);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
TraceUtil.setError(span, e);
|
||||
if (e instanceof ServerNotRunningYetException) {
|
||||
// If ServerNotRunningYetException, don't spew stack trace.
|
||||
if (RpcServer.LOG.isTraceEnabled()) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
|||
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -232,8 +233,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
}
|
||||
if (t != null) {
|
||||
this.isError = true;
|
||||
span.recordException(t);
|
||||
span.setStatus(StatusCode.ERROR);
|
||||
TraceUtil.setError(span, t);
|
||||
} else {
|
||||
span.setStatus(StatusCode.OK);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
|||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -3125,24 +3124,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
private RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
|
||||
long nonceGroup, long nonce) throws IOException {
|
||||
startRegionOperation(Operation.SCAN);
|
||||
try {
|
||||
// Verify families are all valid
|
||||
if (!scan.hasFamilies()) {
|
||||
// Adding all families to scanner
|
||||
for (byte[] family : this.htableDescriptor.getColumnFamilyNames()) {
|
||||
scan.addFamily(family);
|
||||
}
|
||||
} else {
|
||||
for (byte[] family : scan.getFamilyMap().keySet()) {
|
||||
checkFamily(family);
|
||||
long nonceGroup, long nonce) throws IOException {
|
||||
return TraceUtil.trace(() -> {
|
||||
startRegionOperation(Operation.SCAN);
|
||||
try {
|
||||
// Verify families are all valid
|
||||
if (!scan.hasFamilies()) {
|
||||
// Adding all families to scanner
|
||||
for (byte[] family : this.htableDescriptor.getColumnFamilyNames()) {
|
||||
scan.addFamily(family);
|
||||
}
|
||||
} else {
|
||||
for (byte[] family : scan.getFamilyMap().keySet()) {
|
||||
checkFamily(family);
|
||||
}
|
||||
}
|
||||
return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.SCAN);
|
||||
}
|
||||
return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.SCAN);
|
||||
}
|
||||
}, () -> createRegionSpan("Region.getScanner"));
|
||||
}
|
||||
|
||||
protected RegionScannerImpl instantiateRegionScanner(Scan scan,
|
||||
|
@ -3179,15 +3180,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public void delete(Delete delete) throws IOException {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.DELETE);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
mutate(delete);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.DELETE);
|
||||
}
|
||||
TraceUtil.trace(() -> {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.DELETE);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(delete);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.DELETE);
|
||||
}
|
||||
}, () -> createRegionSpan("Region.delete"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3257,20 +3260,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public void put(Put put) throws IOException {
|
||||
checkReadOnly();
|
||||
TraceUtil.trace(() -> {
|
||||
checkReadOnly();
|
||||
|
||||
// Do a rough check that we have resources to accept a write. The check is
|
||||
// 'rough' in that between the resource check and the call to obtain a
|
||||
// read lock, resources may run out. For now, the thought is that this
|
||||
// will be extremely rare; we'll deal with it when it happens.
|
||||
checkResources();
|
||||
startRegionOperation(Operation.PUT);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
mutate(put);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.PUT);
|
||||
}
|
||||
// Do a rough check that we have resources to accept a write. The check is
|
||||
// 'rough' in that between the resource check and the call to obtain a
|
||||
// read lock, resources may run out. For now, the thought is that this
|
||||
// will be extremely rare; we'll deal with it when it happens.
|
||||
checkResources();
|
||||
startRegionOperation(Operation.PUT);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(put);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.PUT);
|
||||
}
|
||||
}, () -> createRegionSpan("Region.put"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3551,7 +3556,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
boolean throwException = false;
|
||||
try {
|
||||
// if atomic then get exclusive lock, else shared lock
|
||||
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
|
||||
rowLock = region.getRowLock(mutation.getRow(), !isAtomic(), prevRowLock);
|
||||
} catch (TimeoutIOException | InterruptedIOException e) {
|
||||
// NOTE: We will retry when other exceptions, but we should stop if we receive
|
||||
// TimeoutIOException or InterruptedIOException as operation has timed out or
|
||||
|
@ -4330,7 +4335,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.checkAndPrepareMutation(cpMutation, timestamp);
|
||||
|
||||
// Acquire row locks. If not, the whole batch will fail.
|
||||
acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true, null));
|
||||
acquiredRowLocks.add(region.getRowLock(cpMutation.getRow(), true, null));
|
||||
|
||||
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
|
||||
// directly add the cells from those mutations to the familyMaps of this mutation.
|
||||
|
@ -4501,7 +4506,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException {
|
||||
return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
return TraceUtil.trace(
|
||||
() -> batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
() -> createRegionSpan("Region.batchMutate"));
|
||||
}
|
||||
|
||||
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
|
||||
|
@ -4783,6 +4790,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
|
||||
long nonce) throws IOException {
|
||||
return TraceUtil.trace(() -> checkAndMutateInternal(checkAndMutate, nonceGroup, nonce),
|
||||
() -> createRegionSpan("Region.checkAndMutate"));
|
||||
}
|
||||
|
||||
private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutate,
|
||||
long nonceGroup, long nonce) throws IOException {
|
||||
byte[] row = checkAndMutate.getRow();
|
||||
Filter filter = null;
|
||||
byte[] family = null;
|
||||
|
@ -4831,7 +4844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
// Lock row - note that doBatchMutate will relock this row if called
|
||||
checkRow(row, "doCheckAndRowMutate");
|
||||
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
|
||||
RowLock rowLock = getRowLock(get.getRow(), false, null);
|
||||
try {
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
CheckAndMutateResult result =
|
||||
|
@ -4841,7 +4854,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
|
||||
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
|
||||
// Supposition is that now all changes are done under row locks, then when we go to read,
|
||||
// we'll get the latest on this row.
|
||||
List<Cell> result = get(get, false);
|
||||
|
@ -4883,7 +4896,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// And else 'delete' is not needed since it already does a second get, and sets the
|
||||
// timestamp from get (see prepareDeleteTimestamps).
|
||||
} else {
|
||||
for (Mutation m: rowMutations.getMutations()) {
|
||||
for (Mutation m : rowMutations.getMutations()) {
|
||||
if (m instanceof Put) {
|
||||
updateCellTimestamps(m.getFamilyCellMap().values(), byteTs);
|
||||
}
|
||||
|
@ -4911,8 +4924,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
private void checkMutationType(final Mutation mutation)
|
||||
throws DoNotRetryIOException {
|
||||
private void checkMutationType(final Mutation mutation) throws DoNotRetryIOException {
|
||||
if (!(mutation instanceof Put) && !(mutation instanceof Delete) &&
|
||||
!(mutation instanceof Increment) && !(mutation instanceof Append)) {
|
||||
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
|
||||
|
@ -6562,11 +6574,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
|
||||
checkRow(row, "row lock");
|
||||
return getRowLockInternal(row, readLock, null);
|
||||
return getRowLock(row, readLock, null);
|
||||
}
|
||||
|
||||
protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock prevRowLock)
|
||||
throws IOException {
|
||||
Span createRegionSpan(String name) {
|
||||
return TraceUtil.createSpan(name).setAttribute(TraceUtil.REGION_NAMES_KEY,
|
||||
Arrays.asList(getRegionInfo().getRegionNameAsString()));
|
||||
}
|
||||
|
||||
// will be override in tests
|
||||
protected RowLock getRowLockInternal(byte[] row, boolean readLock, RowLock prevRowLock)
|
||||
throws IOException {
|
||||
// create an object to use a a key in the row lock map
|
||||
HashedBytes rowKey = new HashedBytes(row);
|
||||
|
||||
|
@ -6574,9 +6592,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
RowLockImpl result = null;
|
||||
|
||||
boolean success = false;
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("HRegion.getRowLock").startSpan();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
span.addEvent("Getting a " + (readLock ? "readLock" : "writeLock"));
|
||||
try {
|
||||
// Keep trying until we have a lock or error out.
|
||||
// TODO: do we need to add a time component here?
|
||||
while (result == null) {
|
||||
|
@ -6613,7 +6629,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
|
||||
span.addEvent("Failed to get row lock");
|
||||
String message = "Timed out waiting for lock for row: " + rowKey + " in region "
|
||||
+ getRegionInfo().getEncodedName();
|
||||
if (reachDeadlineFirst) {
|
||||
|
@ -6631,7 +6646,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
|
||||
getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
span.addEvent("Interrupted exception getting row lock");
|
||||
throw throwOnInterrupt(ie);
|
||||
} catch (Error error) {
|
||||
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count
|
||||
|
@ -6640,17 +6654,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row),
|
||||
getRegionInfo().getRegionNameAsString(), error);
|
||||
IOException ioe = new IOException(error);
|
||||
span.addEvent("Error getting row lock");
|
||||
throw ioe;
|
||||
} finally {
|
||||
// Clean up the counts just in case this was the thing keeping the context alive.
|
||||
if (!success && rowLockContext != null) {
|
||||
rowLockContext.cleanUp();
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
private RowLock getRowLock(byte[] row, boolean readLock, final RowLock prevRowLock)
|
||||
throws IOException {
|
||||
return TraceUtil.trace(() -> getRowLockInternal(row, readLock, prevRowLock),
|
||||
() -> createRegionSpan("Region.getRowLock").setAttribute(TraceUtil.ROW_LOCK_READ_LOCK_KEY,
|
||||
readLock));
|
||||
}
|
||||
|
||||
private void releaseRowLocks(List<RowLock> rowLocks) {
|
||||
if (rowLocks != null) {
|
||||
for (RowLock rowLock : rowLocks) {
|
||||
|
@ -7516,9 +7535,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
return TraceUtil.trace(() -> getInternal(get, withCoprocessor, nonceGroup, nonce),
|
||||
() -> createRegionSpan("Region.get"));
|
||||
}
|
||||
|
||||
private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
|
||||
throws IOException {
|
||||
List<Cell> results = new ArrayList<>();
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
// pre-get CP hook
|
||||
if (withCoprocessor && (coprocessorHost != null)) {
|
||||
|
@ -7531,13 +7556,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
|
||||
scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
scanner = getScanner(scan, null, nonceGroup, nonce);
|
||||
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
|
||||
scanner.next(results);
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
// post-get CP hook
|
||||
|
@ -7609,7 +7629,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
RowLock prevRowLock = null;
|
||||
for (byte[] row : rowsToLock) {
|
||||
try {
|
||||
RowLock rowLock = region.getRowLockInternal(row, false, prevRowLock); // write lock
|
||||
RowLock rowLock = region.getRowLock(row, false, prevRowLock); // write lock
|
||||
if (rowLock != prevRowLock) {
|
||||
acquiredRowLocks.add(rowLock);
|
||||
prevRowLock = rowLock;
|
||||
|
@ -7656,15 +7676,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
public Result append(Append append, long nonceGroup, long nonce) throws IOException {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.APPEND);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(append, true, nonceGroup, nonce).getResult();
|
||||
} finally {
|
||||
closeRegionOperation(Operation.APPEND);
|
||||
}
|
||||
return TraceUtil.trace(() -> {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.APPEND);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(append, true, nonceGroup, nonce).getResult();
|
||||
} finally {
|
||||
closeRegionOperation(Operation.APPEND);
|
||||
}
|
||||
}, () -> createRegionSpan("Region.append"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7673,15 +7695,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.INCREMENT);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(increment, true, nonceGroup, nonce).getResult();
|
||||
} finally {
|
||||
closeRegionOperation(Operation.INCREMENT);
|
||||
}
|
||||
return TraceUtil.trace(() -> {
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
startRegionOperation(Operation.INCREMENT);
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
return mutate(increment, true, nonceGroup, nonce).getResult();
|
||||
} finally {
|
||||
closeRegionOperation(Operation.INCREMENT);
|
||||
}
|
||||
}, () -> createRegionSpan("Region.increment"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
|
|||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -254,39 +255,41 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
|
|||
|
||||
@Override
|
||||
public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
|
||||
if (storeHeap == null) {
|
||||
// scanner is closed
|
||||
throw new UnknownScannerException("Scanner was closed");
|
||||
}
|
||||
boolean moreValues = false;
|
||||
if (outResults.isEmpty()) {
|
||||
// Usually outResults is empty. This is true when next is called
|
||||
// to handle scan or get operation.
|
||||
moreValues = nextInternal(outResults, scannerContext);
|
||||
} else {
|
||||
List<Cell> tmpList = new ArrayList<>();
|
||||
moreValues = nextInternal(tmpList, scannerContext);
|
||||
outResults.addAll(tmpList);
|
||||
}
|
||||
|
||||
if (!outResults.isEmpty()) {
|
||||
region.addReadRequestsCount(1);
|
||||
if (region.getMetrics() != null) {
|
||||
region.getMetrics().updateReadRequestCount();
|
||||
return TraceUtil.trace(() -> {
|
||||
if (storeHeap == null) {
|
||||
// scanner is closed
|
||||
throw new UnknownScannerException("Scanner was closed");
|
||||
}
|
||||
boolean moreValues = false;
|
||||
if (outResults.isEmpty()) {
|
||||
// Usually outResults is empty. This is true when next is called
|
||||
// to handle scan or get operation.
|
||||
moreValues = nextInternal(outResults, scannerContext);
|
||||
} else {
|
||||
List<Cell> tmpList = new ArrayList<>();
|
||||
moreValues = nextInternal(tmpList, scannerContext);
|
||||
outResults.addAll(tmpList);
|
||||
}
|
||||
}
|
||||
|
||||
// If the size limit was reached it means a partial Result is being returned. Returning a
|
||||
// partial Result means that we should not reset the filters; filters should only be reset in
|
||||
// between rows
|
||||
if (!scannerContext.mayHaveMoreCellsInRow()) {
|
||||
resetFilters();
|
||||
}
|
||||
if (!outResults.isEmpty()) {
|
||||
region.addReadRequestsCount(1);
|
||||
if (region.getMetrics() != null) {
|
||||
region.getMetrics().updateReadRequestCount();
|
||||
}
|
||||
}
|
||||
|
||||
if (isFilterDoneInternal()) {
|
||||
moreValues = false;
|
||||
}
|
||||
return moreValues;
|
||||
// If the size limit was reached it means a partial Result is being returned. Returning a
|
||||
// partial Result means that we should not reset the filters; filters should only be reset in
|
||||
// between rows
|
||||
if (!scannerContext.mayHaveMoreCellsInRow()) {
|
||||
resetFilters();
|
||||
}
|
||||
|
||||
if (isFilterDoneInternal()) {
|
||||
moreValues = false;
|
||||
}
|
||||
return moreValues;
|
||||
}, () -> region.createRegionSpan("RegionScanner.next"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -725,8 +728,9 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
|
|||
return c > 0 || (c == 0 && !includeStopRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
|
||||
justification = "this method is only called inside close which is synchronized")
|
||||
private void closeInternal() {
|
||||
if (storeHeap != null) {
|
||||
storeHeap.close();
|
||||
storeHeap = null;
|
||||
|
@ -740,24 +744,31 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
|
|||
this.filterClosed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean reseek(byte[] row) throws IOException {
|
||||
if (row == null) {
|
||||
throw new IllegalArgumentException("Row cannot be null.");
|
||||
}
|
||||
boolean result = false;
|
||||
region.startRegionOperation();
|
||||
Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
|
||||
try {
|
||||
// use request seek to make use of the lazy seek option. See HBASE-5520
|
||||
result = this.storeHeap.requestSeek(kv, true, true);
|
||||
if (this.joinedHeap != null) {
|
||||
result = this.joinedHeap.requestSeek(kv, true, true) || result;
|
||||
return TraceUtil.trace(() -> {
|
||||
if (row == null) {
|
||||
throw new IllegalArgumentException("Row cannot be null.");
|
||||
}
|
||||
} finally {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
return result;
|
||||
boolean result = false;
|
||||
region.startRegionOperation();
|
||||
Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
|
||||
try {
|
||||
// use request seek to make use of the lazy seek option. See HBASE-5520
|
||||
result = this.storeHeap.requestSeek(kv, true, true);
|
||||
if (this.joinedHeap != null) {
|
||||
result = this.joinedHeap.requestSeek(kv, true, true) || result;
|
||||
}
|
||||
} finally {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
return result;
|
||||
}, () -> region.createRegionSpan("RegionScanner.reseek"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -725,7 +725,7 @@ public class TestAtomicOperation {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RowLock getRowLockInternal(final byte[] row, boolean readLock,
|
||||
protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
|
||||
final RowLock prevRowlock) throws IOException {
|
||||
if (testStep == TestStep.CHECKANDPUT_STARTED) {
|
||||
latch.countDown();
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestHRegionTracing {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHRegionTracing.class);
|
||||
|
||||
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("family");
|
||||
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("qual");
|
||||
|
||||
private static byte[] ROW = Bytes.toBytes("row");
|
||||
|
||||
private static byte[] VALUE = Bytes.toBytes("value");
|
||||
|
||||
@Rule
|
||||
public final OpenTelemetryRule traceRule = OpenTelemetryRule.create();
|
||||
|
||||
@Rule
|
||||
public final TableNameTestRule tableNameRule = new TableNameTestRule();
|
||||
|
||||
private static WAL WAL;
|
||||
|
||||
private HRegion region;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws IOException {
|
||||
WAL = HBaseTestingUtility.createWal(UTIL.getConfiguration(), UTIL.getDataTestDir(), null);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws IOException {
|
||||
Closeables.close(WAL, true);
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
TableName tableName = tableNameRule.getTableName();
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
|
||||
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
|
||||
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
region = HRegion.createHRegion(info, UTIL.getDataTestDir(), UTIL.getConfiguration(), desc, WAL);
|
||||
region = UTIL.createLocalHRegion(info, desc);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (region != null) {
|
||||
region.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertSpan(String spanName) {
|
||||
assertTrue(traceRule.getSpans().stream().anyMatch(span -> {
|
||||
if (!span.getName().equals(spanName)) {
|
||||
return false;
|
||||
}
|
||||
List<String> regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY);
|
||||
return regionNames != null && regionNames.size() == 1 &&
|
||||
regionNames.get(0).equals(region.getRegionInfo().getRegionNameAsString());
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws IOException {
|
||||
region.get(new Get(ROW));
|
||||
assertSpan("Region.get");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPut() throws IOException {
|
||||
region.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
|
||||
assertSpan("Region.put");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() throws IOException {
|
||||
region.delete(new Delete(ROW).addColumn(FAMILY, QUALIFIER));
|
||||
assertSpan("Region.delete");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws IOException {
|
||||
region.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
|
||||
assertSpan("Region.append");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws IOException {
|
||||
region.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1));
|
||||
assertSpan("Region.increment");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchMutate() throws IOException {
|
||||
region.batchMutate(new Mutation[] { new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE) });
|
||||
assertSpan("Region.batchMutate");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutate() throws IOException {
|
||||
region.checkAndMutate(CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER)
|
||||
.build(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)));
|
||||
assertSpan("Region.checkAndMutate");
|
||||
assertSpan("Region.getRowLock");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanner() throws IOException {
|
||||
try (RegionScanner scanner = region.getScanner(new Scan())) {
|
||||
scanner.reseek(ROW);
|
||||
scanner.next(new ArrayList<>());
|
||||
}
|
||||
assertSpan("Region.getScanner");
|
||||
assertSpan("RegionScanner.reseek");
|
||||
assertSpan("RegionScanner.next");
|
||||
assertSpan("RegionScanner.close");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue