HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained processing time metrics to the RPC layer. Contributed by Christopher Gregorian.
This commit also includes the follow-on commit827a84778a
. (cherry-picked fromf96a2df38d
)
This commit is contained in:
parent
168dc3f258
commit
d4492bdd9e
|
@ -193,9 +193,8 @@ public class CallQueueManager<E extends Schedulable>
|
||||||
return scheduler.shouldBackOff(e);
|
return scheduler.shouldBackOff(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addResponseTime(String name, int priorityLevel, int queueTime,
|
void addResponseTime(String name, Schedulable e, ProcessingDetails details) {
|
||||||
int processingTime) {
|
scheduler.addResponseTime(name, e, details);
|
||||||
scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should be only called once per call and cached in the call object
|
// This should be only called once per call and cached in the call object
|
||||||
|
|
|
@ -55,6 +55,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The decay RPC scheduler counts incoming requests in a map, then
|
* The decay RPC scheduler counts incoming requests in a map, then
|
||||||
* decays the counts at a fixed time interval. The scheduler is optimized
|
* decays the counts at a fixed time interval. The scheduler is optimized
|
||||||
|
@ -600,14 +602,18 @@ public class DecayRpcScheduler implements RpcScheduler,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addResponseTime(String name, int priorityLevel, int queueTime,
|
public void addResponseTime(String callName, Schedulable schedulable,
|
||||||
int processingTime) {
|
ProcessingDetails details) {
|
||||||
|
int priorityLevel = schedulable.getPriorityLevel();
|
||||||
|
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
|
||||||
|
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
|
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
|
||||||
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
|
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
|
||||||
queueTime+processingTime);
|
queueTime+processingTime);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
|
LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
|
||||||
"processingTime: {} ", name, priorityLevel, queueTime,
|
"processingTime: {} ", callName, priorityLevel, queueTime,
|
||||||
processingTime);
|
processingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,8 +35,8 @@ public class DefaultRpcScheduler implements RpcScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addResponseTime(String name, int priorityLevel, int queueTime,
|
public void addResponseTime(String callName, Schedulable schedulable,
|
||||||
int processingTime) {
|
ProcessingDetails details) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public DefaultRpcScheduler(int priorityLevels, String namespace,
|
public DefaultRpcScheduler(int priorityLevels, String namespace,
|
||||||
|
|
|
@ -37,6 +37,11 @@ public abstract class ExternalCall<T> extends Call {
|
||||||
this.action = action;
|
this.action = action;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDetailedMetricsName() {
|
||||||
|
return "(external)";
|
||||||
|
}
|
||||||
|
|
||||||
public abstract UserGroupInformation getRemoteUser();
|
public abstract UserGroupInformation getRemoteUser();
|
||||||
|
|
||||||
public final T get() throws InterruptedException, ExecutionException {
|
public final T get() throws InterruptedException, ExecutionException {
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the times that a call takes to be processed through each step.
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ProcessingDetails {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ProcessingDetails.class);
|
||||||
|
private final TimeUnit valueTimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The different stages to track the time of.
|
||||||
|
*/
|
||||||
|
public enum Timing {
|
||||||
|
ENQUEUE, // time for reader to insert in call queue.
|
||||||
|
QUEUE, // time in the call queue.
|
||||||
|
HANDLER, // handler overhead not spent in processing/response.
|
||||||
|
PROCESSING, // time handler spent processing the call. always equal to
|
||||||
|
// lock_free + lock_wait + lock_shared + lock_exclusive
|
||||||
|
LOCKFREE, // processing with no lock.
|
||||||
|
LOCKWAIT, // processing while waiting for lock.
|
||||||
|
LOCKSHARED, // processing with a read lock.
|
||||||
|
LOCKEXCLUSIVE, // processing with a write lock.
|
||||||
|
RESPONSE; // time to encode and send response.
|
||||||
|
}
|
||||||
|
|
||||||
|
private long[] timings = new long[Timing.values().length];
|
||||||
|
|
||||||
|
ProcessingDetails(TimeUnit timeUnit) {
|
||||||
|
this.valueTimeUnit = timeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long get(Timing type) {
|
||||||
|
// When using nanoTime to fetch timing information, it is possible to see
|
||||||
|
// time "move backward" slightly under unusual/rare circumstances. To avoid
|
||||||
|
// displaying a confusing number, round such timings to 0 here.
|
||||||
|
long ret = timings[type.ordinal()];
|
||||||
|
return ret < 0 ? 0 : ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long get(Timing type, TimeUnit timeUnit) {
|
||||||
|
return timeUnit.convert(get(type), valueTimeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void set(Timing type, long value) {
|
||||||
|
timings[type.ordinal()] = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void set(Timing type, long value, TimeUnit timeUnit) {
|
||||||
|
set(type, valueTimeUnit.convert(value, timeUnit));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(Timing type, long value, TimeUnit timeUnit) {
|
||||||
|
timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder(256);
|
||||||
|
for (Timing type : Timing.values()) {
|
||||||
|
if (sb.length() > 0) {
|
||||||
|
sb.append(" ");
|
||||||
|
}
|
||||||
|
sb.append(type.name().toLowerCase())
|
||||||
|
.append("Time=").append(get(type));
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -520,46 +520,29 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
Message param = request.getValue(prototype);
|
Message param = request.getValue(prototype);
|
||||||
|
|
||||||
Message result;
|
Message result;
|
||||||
long startTime = Time.now();
|
Call currentCall = Server.getCurCall().get();
|
||||||
int qTime = (int) (startTime - receiveTime);
|
|
||||||
Exception exception = null;
|
|
||||||
boolean isDeferred = false;
|
|
||||||
try {
|
try {
|
||||||
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
|
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
|
||||||
currentCallInfo.set(new CallInfo(server, methodName));
|
currentCallInfo.set(new CallInfo(server, methodName));
|
||||||
|
currentCall.setDetailedMetricsName(methodName);
|
||||||
result = service.callBlockingMethod(methodDescriptor, null, param);
|
result = service.callBlockingMethod(methodDescriptor, null, param);
|
||||||
// Check if this needs to be a deferred response,
|
// Check if this needs to be a deferred response,
|
||||||
// by checking the ThreadLocal callback being set
|
// by checking the ThreadLocal callback being set
|
||||||
if (currentCallback.get() != null) {
|
if (currentCallback.get() != null) {
|
||||||
Server.getCurCall().get().deferResponse();
|
currentCall.deferResponse();
|
||||||
isDeferred = true;
|
|
||||||
currentCallback.set(null);
|
currentCallback.set(null);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
exception = (Exception) e.getCause();
|
Exception exception = (Exception) e.getCause();
|
||||||
|
currentCall.setDetailedMetricsName(
|
||||||
|
exception.getClass().getSimpleName());
|
||||||
throw (Exception) e.getCause();
|
throw (Exception) e.getCause();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
exception = e;
|
currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
currentCallInfo.set(null);
|
currentCallInfo.set(null);
|
||||||
int processingTime = (int) (Time.now() - startTime);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
String msg =
|
|
||||||
"Served: " + methodName + (isDeferred ? ", deferred" : "") +
|
|
||||||
", queueTime= " + qTime +
|
|
||||||
" procesingTime= " + processingTime;
|
|
||||||
if (exception != null) {
|
|
||||||
msg += " exception= " + exception.getClass().getSimpleName();
|
|
||||||
}
|
|
||||||
LOG.debug(msg);
|
|
||||||
}
|
|
||||||
String detailedMetricsName = (exception == null) ?
|
|
||||||
methodName :
|
|
||||||
exception.getClass().getSimpleName();
|
|
||||||
server.updateMetrics(detailedMetricsName, qTime, processingTime,
|
|
||||||
isDeferred);
|
|
||||||
}
|
}
|
||||||
return RpcWritable.wrap(result);
|
return RpcWritable.wrap(result);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement this interface to be used for RPC scheduling and backoff.
|
* Implement this interface to be used for RPC scheduling and backoff.
|
||||||
*
|
*
|
||||||
|
@ -30,8 +32,43 @@ public interface RpcScheduler {
|
||||||
|
|
||||||
boolean shouldBackOff(Schedulable obj);
|
boolean shouldBackOff(Schedulable obj);
|
||||||
|
|
||||||
void addResponseTime(String name, int priorityLevel, int queueTime,
|
/**
|
||||||
int processingTime);
|
* This method only exists to maintain backwards compatibility with old
|
||||||
|
* implementations. It will not be called by any Hadoop code, and should not
|
||||||
|
* be implemented by new implementations.
|
||||||
|
*
|
||||||
|
* @deprecated Use
|
||||||
|
* {@link #addResponseTime(String, Schedulable, ProcessingDetails)} instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
default void addResponseTime(String name, int priorityLevel, int queueTime,
|
||||||
|
int processingTime) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"This method is deprecated: use the other addResponseTime");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store a processing time value for an RPC call into this scheduler.
|
||||||
|
*
|
||||||
|
* @param callName The name of the call.
|
||||||
|
* @param schedulable The schedulable representing the incoming call.
|
||||||
|
* @param details The details of processing time.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
default void addResponseTime(String callName, Schedulable schedulable,
|
||||||
|
ProcessingDetails details) {
|
||||||
|
// For the sake of backwards compatibility with old implementations of
|
||||||
|
// this interface, a default implementation is supplied which uses the old
|
||||||
|
// method. All new implementations MUST override this interface and should
|
||||||
|
// NOT use the other addResponseTime method.
|
||||||
|
int queueTimeMs = (int)
|
||||||
|
details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
|
||||||
|
int processingTimeMs = (int)
|
||||||
|
details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
||||||
|
addResponseTime(callName, schedulable.getPriorityLevel(),
|
||||||
|
queueTimeMs, processingTimeMs);
|
||||||
|
}
|
||||||
|
|
||||||
void stop();
|
void stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
|
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
||||||
|
@ -63,6 +64,7 @@ import java.util.TimerTask;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -491,7 +493,7 @@ public abstract class Server {
|
||||||
* if and only if it falls above 99.7% of requests. We start this logic
|
* if and only if it falls above 99.7% of requests. We start this logic
|
||||||
* only once we have enough sample size.
|
* only once we have enough sample size.
|
||||||
*/
|
*/
|
||||||
void logSlowRpcCalls(String methodName, int processingTime) {
|
void logSlowRpcCalls(String methodName, Call call, long processingTime) {
|
||||||
final int deviation = 3;
|
final int deviation = 3;
|
||||||
|
|
||||||
// 1024 for minSampleSize just a guess -- not a number computed based on
|
// 1024 for minSampleSize just a guess -- not a number computed based on
|
||||||
|
@ -504,27 +506,47 @@ public abstract class Server {
|
||||||
|
|
||||||
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
|
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
|
||||||
(processingTime > threeSigma)) {
|
(processingTime > threeSigma)) {
|
||||||
if(LOG.isWarnEnabled()) {
|
LOG.warn("Slow RPC : {} took {} {} to process from client {}",
|
||||||
String client = CurCall.get().toString();
|
methodName, processingTime, RpcMetrics.TIMEUNIT, call);
|
||||||
LOG.warn(
|
|
||||||
"Slow RPC : " + methodName + " took " + processingTime +
|
|
||||||
" milliseconds to process from client " + client);
|
|
||||||
}
|
|
||||||
rpcMetrics.incrSlowRpc();
|
rpcMetrics.incrSlowRpc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateMetrics(String name, int queueTime, int processingTime,
|
void updateMetrics(Call call, long startTime, boolean connDropped) {
|
||||||
boolean deferredCall) {
|
// delta = handler + processing + response
|
||||||
|
long deltaNanos = Time.monotonicNowNanos() - startTime;
|
||||||
|
long timestampNanos = call.timestampNanos;
|
||||||
|
|
||||||
|
ProcessingDetails details = call.getProcessingDetails();
|
||||||
|
// queue time is the delta between when the call first arrived and when it
|
||||||
|
// began being serviced, minus the time it took to be put into the queue
|
||||||
|
details.set(Timing.QUEUE,
|
||||||
|
startTime - timestampNanos - details.get(Timing.ENQUEUE));
|
||||||
|
deltaNanos -= details.get(Timing.PROCESSING);
|
||||||
|
deltaNanos -= details.get(Timing.RESPONSE);
|
||||||
|
details.set(Timing.HANDLER, deltaNanos);
|
||||||
|
|
||||||
|
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||||
rpcMetrics.addRpcQueueTime(queueTime);
|
rpcMetrics.addRpcQueueTime(queueTime);
|
||||||
if (!deferredCall) {
|
|
||||||
rpcMetrics.addRpcProcessingTime(processingTime);
|
if (call.isResponseDeferred() || connDropped) {
|
||||||
rpcDetailedMetrics.addProcessingTime(name, processingTime);
|
// call was skipped; don't include it in processing metrics
|
||||||
callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
|
return;
|
||||||
processingTime);
|
|
||||||
if (isLogSlowRPC()) {
|
|
||||||
logSlowRpcCalls(name, processingTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long processingTime =
|
||||||
|
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||||
|
long waitTime =
|
||||||
|
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
|
||||||
|
rpcMetrics.addRpcLockWaitTime(waitTime);
|
||||||
|
rpcMetrics.addRpcProcessingTime(processingTime);
|
||||||
|
// don't include lock wait for detailed metrics.
|
||||||
|
processingTime -= waitTime;
|
||||||
|
String name = call.getDetailedMetricsName();
|
||||||
|
rpcDetailedMetrics.addProcessingTime(name, processingTime);
|
||||||
|
callQueue.addResponseTime(name, call, details);
|
||||||
|
if (isLogSlowRPC()) {
|
||||||
|
logSlowRpcCalls(name, call, processingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -693,9 +715,13 @@ public abstract class Server {
|
||||||
/** A generic call queued for handling. */
|
/** A generic call queued for handling. */
|
||||||
public static class Call implements Schedulable,
|
public static class Call implements Schedulable,
|
||||||
PrivilegedExceptionAction<Void> {
|
PrivilegedExceptionAction<Void> {
|
||||||
|
private final ProcessingDetails processingDetails =
|
||||||
|
new ProcessingDetails(TimeUnit.NANOSECONDS);
|
||||||
|
// the method name to use in metrics
|
||||||
|
private volatile String detailedMetricsName = "";
|
||||||
final int callId; // the client's call id
|
final int callId; // the client's call id
|
||||||
final int retryCount; // the retry count of the call
|
final int retryCount; // the retry count of the call
|
||||||
long timestamp; // time received when response is null
|
long timestampNanos; // time received when response is null
|
||||||
// time served when response is not null
|
// time served when response is not null
|
||||||
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
||||||
final RPC.RpcKind rpcKind;
|
final RPC.RpcKind rpcKind;
|
||||||
|
@ -732,7 +758,7 @@ public abstract class Server {
|
||||||
TraceScope traceScope, CallerContext callerContext) {
|
TraceScope traceScope, CallerContext callerContext) {
|
||||||
this.callId = id;
|
this.callId = id;
|
||||||
this.retryCount = retryCount;
|
this.retryCount = retryCount;
|
||||||
this.timestamp = Time.now();
|
this.timestampNanos = Time.monotonicNowNanos();
|
||||||
this.rpcKind = kind;
|
this.rpcKind = kind;
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
this.traceScope = traceScope;
|
this.traceScope = traceScope;
|
||||||
|
@ -741,6 +767,28 @@ public abstract class Server {
|
||||||
this.isCallCoordinated = false;
|
this.isCallCoordinated = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether the call has been processed. Always true unless
|
||||||
|
* overridden.
|
||||||
|
*
|
||||||
|
* @return true
|
||||||
|
*/
|
||||||
|
boolean isOpen() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getDetailedMetricsName() {
|
||||||
|
return detailedMetricsName;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setDetailedMetricsName(String name) {
|
||||||
|
detailedMetricsName = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProcessingDetails getProcessingDetails() {
|
||||||
|
return processingDetails;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Call#" + callId + " Retry#" + retryCount;
|
return "Call#" + callId + " Retry#" + retryCount;
|
||||||
|
@ -888,6 +936,11 @@ public abstract class Server {
|
||||||
this.rpcRequest = param;
|
this.rpcRequest = param;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean isOpen() {
|
||||||
|
return connection.channel.isOpen();
|
||||||
|
}
|
||||||
|
|
||||||
void setResponseFields(Writable returnValue,
|
void setResponseFields(Writable returnValue,
|
||||||
ResponseParams responseParams) {
|
ResponseParams responseParams) {
|
||||||
this.rv = returnValue;
|
this.rv = returnValue;
|
||||||
|
@ -915,18 +968,33 @@ public abstract class Server {
|
||||||
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
|
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long startNanos = Time.monotonicNowNanos();
|
||||||
Writable value = null;
|
Writable value = null;
|
||||||
ResponseParams responseParams = new ResponseParams();
|
ResponseParams responseParams = new ResponseParams();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
value = call(
|
value = call(
|
||||||
rpcKind, connection.protocolName, rpcRequest, timestamp);
|
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
populateResponseParamsOnError(e, responseParams);
|
populateResponseParamsOnError(e, responseParams);
|
||||||
}
|
}
|
||||||
if (!isResponseDeferred()) {
|
if (!isResponseDeferred()) {
|
||||||
|
long deltaNanos = Time.monotonicNowNanos() - startNanos;
|
||||||
|
ProcessingDetails details = getProcessingDetails();
|
||||||
|
|
||||||
|
details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
|
||||||
|
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
|
||||||
|
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
|
||||||
|
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
|
||||||
|
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
|
||||||
|
startNanos = Time.monotonicNowNanos();
|
||||||
|
|
||||||
setResponseFields(value, responseParams);
|
setResponseFields(value, responseParams);
|
||||||
sendResponse();
|
sendResponse();
|
||||||
|
|
||||||
|
deltaNanos = Time.monotonicNowNanos() - startNanos;
|
||||||
|
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Deferring response for callId: " + this.callId);
|
LOG.debug("Deferring response for callId: " + this.callId);
|
||||||
|
@ -1346,13 +1414,14 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(
|
||||||
|
15, TimeUnit.MINUTES);
|
||||||
|
|
||||||
// Sends responses of RPC back to clients.
|
// Sends responses of RPC back to clients.
|
||||||
private class Responder extends Thread {
|
private class Responder extends Thread {
|
||||||
private final Selector writeSelector;
|
private final Selector writeSelector;
|
||||||
private int pending; // connections waiting to register
|
private int pending; // connections waiting to register
|
||||||
|
|
||||||
final static int PURGE_INTERVAL = 900000; // 15mins
|
|
||||||
|
|
||||||
Responder() throws IOException {
|
Responder() throws IOException {
|
||||||
this.setName("IPC Server Responder");
|
this.setName("IPC Server Responder");
|
||||||
this.setDaemon(true);
|
this.setDaemon(true);
|
||||||
|
@ -1377,12 +1446,13 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRunLoop() {
|
private void doRunLoop() {
|
||||||
long lastPurgeTime = 0; // last check for old calls.
|
long lastPurgeTimeNanos = 0; // last check for old calls.
|
||||||
|
|
||||||
while (running) {
|
while (running) {
|
||||||
try {
|
try {
|
||||||
waitPending(); // If a channel is being registered, wait.
|
waitPending(); // If a channel is being registered, wait.
|
||||||
writeSelector.select(PURGE_INTERVAL);
|
writeSelector.select(
|
||||||
|
TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
|
||||||
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
|
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
SelectionKey key = iter.next();
|
SelectionKey key = iter.next();
|
||||||
|
@ -1404,11 +1474,11 @@ public abstract class Server {
|
||||||
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
|
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
long now = Time.now();
|
long nowNanos = Time.monotonicNowNanos();
|
||||||
if (now < lastPurgeTime + PURGE_INTERVAL) {
|
if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
lastPurgeTime = now;
|
lastPurgeTimeNanos = nowNanos;
|
||||||
//
|
//
|
||||||
// If there were some calls that have not been sent out for a
|
// If there were some calls that have not been sent out for a
|
||||||
// long time, discard them.
|
// long time, discard them.
|
||||||
|
@ -1432,7 +1502,7 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (RpcCall call : calls) {
|
for (RpcCall call : calls) {
|
||||||
doPurge(call, now);
|
doPurge(call, nowNanos);
|
||||||
}
|
}
|
||||||
} catch (OutOfMemoryError e) {
|
} catch (OutOfMemoryError e) {
|
||||||
//
|
//
|
||||||
|
@ -1483,7 +1553,7 @@ public abstract class Server {
|
||||||
Iterator<RpcCall> iter = responseQueue.listIterator(0);
|
Iterator<RpcCall> iter = responseQueue.listIterator(0);
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
call = iter.next();
|
call = iter.next();
|
||||||
if (now > call.timestamp + PURGE_INTERVAL) {
|
if (now > call.timestampNanos + PURGE_INTERVAL_NANOS) {
|
||||||
closeConnection(call.connection);
|
closeConnection(call.connection);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1547,7 +1617,7 @@ public abstract class Server {
|
||||||
|
|
||||||
if (inHandler) {
|
if (inHandler) {
|
||||||
// set the serve time when the response has to be sent later
|
// set the serve time when the response has to be sent later
|
||||||
call.timestamp = Time.now();
|
call.timestampNanos = Time.monotonicNowNanos();
|
||||||
|
|
||||||
incPending();
|
incPending();
|
||||||
try {
|
try {
|
||||||
|
@ -2731,6 +2801,9 @@ public abstract class Server {
|
||||||
} else {
|
} else {
|
||||||
callQueue.add(call);
|
callQueue.add(call);
|
||||||
}
|
}
|
||||||
|
long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos;
|
||||||
|
call.getProcessingDetails().set(Timing.ENQUEUE, deltaNanos,
|
||||||
|
TimeUnit.NANOSECONDS);
|
||||||
} catch (CallQueueOverflowException cqe) {
|
} catch (CallQueueOverflowException cqe) {
|
||||||
// If rpc scheduler indicates back off based on performance degradation
|
// If rpc scheduler indicates back off based on performance degradation
|
||||||
// such as response time or rpc queue is full, we will ask the client
|
// such as response time or rpc queue is full, we will ask the client
|
||||||
|
@ -2757,8 +2830,16 @@ public abstract class Server {
|
||||||
SERVER.set(Server.this);
|
SERVER.set(Server.this);
|
||||||
while (running) {
|
while (running) {
|
||||||
TraceScope traceScope = null;
|
TraceScope traceScope = null;
|
||||||
|
Call call = null;
|
||||||
|
long startTimeNanos = 0;
|
||||||
|
// True iff the connection for this call has been dropped.
|
||||||
|
// Set to true by default and update to false later if the connection
|
||||||
|
// can be succesfully read.
|
||||||
|
boolean connDropped = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final Call call = callQueue.take(); // pop the queue; maybe blocked here
|
call = callQueue.take(); // pop the queue; maybe blocked here
|
||||||
|
startTimeNanos = Time.monotonicNowNanos();
|
||||||
if (alignmentContext != null && call.isCallCoordinated() &&
|
if (alignmentContext != null && call.isCallCoordinated() &&
|
||||||
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
|
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
|
||||||
/*
|
/*
|
||||||
|
@ -2789,6 +2870,7 @@ public abstract class Server {
|
||||||
// always update the current call context
|
// always update the current call context
|
||||||
CallerContext.setCurrent(call.callerContext);
|
CallerContext.setCurrent(call.callerContext);
|
||||||
UserGroupInformation remoteUser = call.getRemoteUser();
|
UserGroupInformation remoteUser = call.getRemoteUser();
|
||||||
|
connDropped = !call.isOpen();
|
||||||
if (remoteUser != null) {
|
if (remoteUser != null) {
|
||||||
remoteUser.doAs(call);
|
remoteUser.doAs(call);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2811,6 +2893,14 @@ public abstract class Server {
|
||||||
} finally {
|
} finally {
|
||||||
CurCall.set(null);
|
CurCall.set(null);
|
||||||
IOUtils.cleanupWithLogger(LOG, traceScope);
|
IOUtils.cleanupWithLogger(LOG, traceScope);
|
||||||
|
if (call != null) {
|
||||||
|
updateMetrics(call, startTimeNanos, connDropped);
|
||||||
|
ProcessingDetails.LOG.debug(
|
||||||
|
"Served: [{}]{} name={} user={} details={}",
|
||||||
|
call, (call.isResponseDeferred() ? ", deferred" : ""),
|
||||||
|
call.getDetailedMetricsName(), call.getRemoteUser(),
|
||||||
|
call.getProcessingDetails());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug(Thread.currentThread().getName() + ": exiting");
|
LOG.debug(Thread.currentThread().getName() + ": exiting");
|
||||||
|
|
|
@ -537,15 +537,15 @@ public class WritableRpcEngine implements RpcEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invoke the protocol method
|
// Invoke the protocol method
|
||||||
long startTime = Time.now();
|
|
||||||
int qTime = (int) (startTime-receivedTime);
|
|
||||||
Exception exception = null;
|
Exception exception = null;
|
||||||
|
Call currentCall = Server.getCurCall().get();
|
||||||
try {
|
try {
|
||||||
Method method =
|
Method method =
|
||||||
protocolImpl.protocolClass.getMethod(call.getMethodName(),
|
protocolImpl.protocolClass.getMethod(call.getMethodName(),
|
||||||
call.getParameterClasses());
|
call.getParameterClasses());
|
||||||
method.setAccessible(true);
|
method.setAccessible(true);
|
||||||
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
|
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
|
||||||
|
currentCall.setDetailedMetricsName(call.getMethodName());
|
||||||
Object value =
|
Object value =
|
||||||
method.invoke(protocolImpl.protocolImpl, call.getParameters());
|
method.invoke(protocolImpl.protocolImpl, call.getParameters());
|
||||||
if (server.verbose) log("Return: "+value);
|
if (server.verbose) log("Return: "+value);
|
||||||
|
@ -571,20 +571,10 @@ public class WritableRpcEngine implements RpcEngine {
|
||||||
exception = ioe;
|
exception = ioe;
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
int processingTime = (int) (Time.now() - startTime);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
String msg = "Served: " + call.getMethodName() +
|
|
||||||
" queueTime= " + qTime + " procesingTime= " + processingTime;
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
msg += " exception= " + exception.getClass().getSimpleName();
|
currentCall.setDetailedMetricsName(
|
||||||
|
exception.getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
LOG.debug(msg);
|
|
||||||
}
|
|
||||||
String detailedMetricsName = (exception == null) ?
|
|
||||||
call.getMethodName() :
|
|
||||||
exception.getClass().getSimpleName();
|
|
||||||
server
|
|
||||||
.updateMetrics(detailedMetricsName, qTime, processingTime, false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,12 +66,12 @@ public class RpcDetailedMetrics {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add an RPC processing time sample
|
* Add an RPC processing time sample
|
||||||
* @param name of the RPC call
|
* @param rpcCallName of the RPC call
|
||||||
* @param processingTime the processing time
|
* @param processingTime the processing time
|
||||||
*/
|
*/
|
||||||
//@Override // some instrumentation interface
|
//@Override // some instrumentation interface
|
||||||
public void addProcessingTime(String name, int processingTime) {
|
public void addProcessingTime(String rpcCallName, long processingTime) {
|
||||||
rates.add(name, processingTime);
|
rates.add(rpcCallName, processingTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addDeferredProcessingTime(String name, long processingTime) {
|
public void addDeferredProcessingTime(String name, long processingTime) {
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ipc.metrics;
|
package org.apache.hadoop.ipc.metrics;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -27,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
@ -47,6 +48,8 @@ public class RpcMetrics {
|
||||||
final MetricsRegistry registry;
|
final MetricsRegistry registry;
|
||||||
final String name;
|
final String name;
|
||||||
final boolean rpcQuantileEnable;
|
final boolean rpcQuantileEnable;
|
||||||
|
/** The time unit used when storing/accessing time durations. */
|
||||||
|
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
RpcMetrics(Server server, Configuration conf) {
|
RpcMetrics(Server server, Configuration conf) {
|
||||||
String port = String.valueOf(server.getListenerAddress().getPort());
|
String port = String.valueOf(server.getListenerAddress().getPort());
|
||||||
|
@ -61,23 +64,30 @@ public class RpcMetrics {
|
||||||
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
|
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
|
||||||
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
|
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
|
||||||
if (rpcQuantileEnable) {
|
if (rpcQuantileEnable) {
|
||||||
rpcQueueTimeMillisQuantiles =
|
rpcQueueTimeQuantiles =
|
||||||
new MutableQuantiles[intervals.length];
|
new MutableQuantiles[intervals.length];
|
||||||
rpcProcessingTimeMillisQuantiles =
|
rpcLockWaitTimeQuantiles =
|
||||||
new MutableQuantiles[intervals.length];
|
new MutableQuantiles[intervals.length];
|
||||||
deferredRpcProcessingTimeMillisQuantiles =
|
rpcProcessingTimeQuantiles =
|
||||||
|
new MutableQuantiles[intervals.length];
|
||||||
|
deferredRpcProcessingTimeQuantiles =
|
||||||
new MutableQuantiles[intervals.length];
|
new MutableQuantiles[intervals.length];
|
||||||
for (int i = 0; i < intervals.length; i++) {
|
for (int i = 0; i < intervals.length; i++) {
|
||||||
int interval = intervals[i];
|
int interval = intervals[i];
|
||||||
rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
|
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
|
||||||
+ interval + "s", "rpc queue time in milli second", "ops",
|
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
|
||||||
"latency", interval);
|
"latency", interval);
|
||||||
rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
|
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
|
||||||
|
"rpcLockWaitTime" + interval + "s",
|
||||||
|
"rpc lock wait time in " + TIMEUNIT, "ops",
|
||||||
|
"latency", interval);
|
||||||
|
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
|
||||||
"rpcProcessingTime" + interval + "s",
|
"rpcProcessingTime" + interval + "s",
|
||||||
"rpc processing time in milli second", "ops", "latency", interval);
|
"rpc processing time in " + TIMEUNIT, "ops",
|
||||||
deferredRpcProcessingTimeMillisQuantiles[i] = registry
|
"latency", interval);
|
||||||
.newQuantiles("deferredRpcProcessingTime" + interval + "s",
|
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
|
||||||
"deferred rpc processing time in milli seconds", "ops",
|
"deferredRpcProcessingTime" + interval + "s",
|
||||||
|
"deferred rpc processing time in " + TIMEUNIT, "ops",
|
||||||
"latency", interval);
|
"latency", interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,11 +104,13 @@ public class RpcMetrics {
|
||||||
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
|
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
|
||||||
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
|
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
|
||||||
@Metric("Queue time") MutableRate rpcQueueTime;
|
@Metric("Queue time") MutableRate rpcQueueTime;
|
||||||
MutableQuantiles[] rpcQueueTimeMillisQuantiles;
|
MutableQuantiles[] rpcQueueTimeQuantiles;
|
||||||
|
@Metric("Lock wait time") MutableRate rpcLockWaitTime;
|
||||||
|
MutableQuantiles[] rpcLockWaitTimeQuantiles;
|
||||||
@Metric("Processing time") MutableRate rpcProcessingTime;
|
@Metric("Processing time") MutableRate rpcProcessingTime;
|
||||||
MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
|
MutableQuantiles[] rpcProcessingTimeQuantiles;
|
||||||
@Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
|
@Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
|
||||||
MutableQuantiles[] deferredRpcProcessingTimeMillisQuantiles;
|
MutableQuantiles[] deferredRpcProcessingTimeQuantiles;
|
||||||
@Metric("Number of authentication failures")
|
@Metric("Number of authentication failures")
|
||||||
MutableCounterLong rpcAuthenticationFailures;
|
MutableCounterLong rpcAuthenticationFailures;
|
||||||
@Metric("Number of authentication successes")
|
@Metric("Number of authentication successes")
|
||||||
|
@ -196,25 +208,32 @@ public class RpcMetrics {
|
||||||
* Add an RPC queue time sample
|
* Add an RPC queue time sample
|
||||||
* @param qTime the queue time
|
* @param qTime the queue time
|
||||||
*/
|
*/
|
||||||
//@Override
|
public void addRpcQueueTime(long qTime) {
|
||||||
public void addRpcQueueTime(int qTime) {
|
|
||||||
rpcQueueTime.add(qTime);
|
rpcQueueTime.add(qTime);
|
||||||
if (rpcQuantileEnable) {
|
if (rpcQuantileEnable) {
|
||||||
for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
|
for (MutableQuantiles q : rpcQueueTimeQuantiles) {
|
||||||
q.add(qTime);
|
q.add(qTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addRpcLockWaitTime(long waitTime) {
|
||||||
|
rpcLockWaitTime.add(waitTime);
|
||||||
|
if (rpcQuantileEnable) {
|
||||||
|
for (MutableQuantiles q : rpcLockWaitTimeQuantiles) {
|
||||||
|
q.add(waitTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add an RPC processing time sample
|
* Add an RPC processing time sample
|
||||||
* @param processingTime the processing time
|
* @param processingTime the processing time
|
||||||
*/
|
*/
|
||||||
//@Override
|
public void addRpcProcessingTime(long processingTime) {
|
||||||
public void addRpcProcessingTime(int processingTime) {
|
|
||||||
rpcProcessingTime.add(processingTime);
|
rpcProcessingTime.add(processingTime);
|
||||||
if (rpcQuantileEnable) {
|
if (rpcQuantileEnable) {
|
||||||
for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
|
for (MutableQuantiles q : rpcProcessingTimeQuantiles) {
|
||||||
q.add(processingTime);
|
q.add(processingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,7 +242,7 @@ public class RpcMetrics {
|
||||||
public void addDeferredRpcProcessingTime(long processingTime) {
|
public void addDeferredRpcProcessingTime(long processingTime) {
|
||||||
deferredRpcProcessingTime.add(processingTime);
|
deferredRpcProcessingTime.add(processingTime);
|
||||||
if (rpcQuantileEnable) {
|
if (rpcQuantileEnable) {
|
||||||
for (MutableQuantiles q : deferredRpcProcessingTimeMillisQuantiles) {
|
for (MutableQuantiles q : deferredRpcProcessingTimeQuantiles) {
|
||||||
q.add(processingTime);
|
q.add(processingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,8 @@ Each metrics record contains tags such as Hostname and port (number to which ser
|
||||||
| `SentBytes` | Total number of sent bytes |
|
| `SentBytes` | Total number of sent bytes |
|
||||||
| `RpcQueueTimeNumOps` | Total number of RPC calls |
|
| `RpcQueueTimeNumOps` | Total number of RPC calls |
|
||||||
| `RpcQueueTimeAvgTime` | Average queue time in milliseconds |
|
| `RpcQueueTimeAvgTime` | Average queue time in milliseconds |
|
||||||
|
| `RpcLockWaitTimeNumOps` | Total number of RPC call (same as RpcQueueTimeNumOps) |
|
||||||
|
| `RpcLockWaitTimeAvgTime` | Average time waiting for lock acquisition in milliseconds |
|
||||||
| `RpcProcessingTimeNumOps` | Total number of RPC calls (same to RpcQueueTimeNumOps) |
|
| `RpcProcessingTimeNumOps` | Total number of RPC calls (same to RpcQueueTimeNumOps) |
|
||||||
| `RpcProcessingAvgTime` | Average Processing time in milliseconds |
|
| `RpcProcessingAvgTime` | Average Processing time in milliseconds |
|
||||||
| `RpcAuthenticationFailures` | Total number of authentication failures |
|
| `RpcAuthenticationFailures` | Total number of authentication failures |
|
||||||
|
@ -92,6 +94,12 @@ Each metrics record contains tags such as Hostname and port (number to which ser
|
||||||
| `rpcProcessingTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcProcessingTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcProcessingTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcProcessingTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcProcessingTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcProcessingTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC processing time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
|
||||||
RetryCache/NameNodeRetryCache
|
RetryCache/NameNodeRetryCache
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
@ -118,6 +126,7 @@ rpcdetailed context
|
||||||
===================
|
===================
|
||||||
|
|
||||||
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average turn around time for method calls in milliseconds.
|
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average turn around time for method calls in milliseconds.
|
||||||
|
Please note that the AvgTime metrics do not include time spent waiting to acquire locks on data structures (see RpcLockWaitTimeAvgTime).
|
||||||
|
|
||||||
rpcdetailed
|
rpcdetailed
|
||||||
-----------
|
-----------
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for ProcessingDetails time unit conversion and output.
|
||||||
|
*/
|
||||||
|
public class TestProcessingDetails {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the conversion of time values in various units in and out of the
|
||||||
|
* details are done properly.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTimeConversion() {
|
||||||
|
ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
|
||||||
|
|
||||||
|
details.set(Timing.ENQUEUE, 10);
|
||||||
|
assertEquals(10, details.get(Timing.ENQUEUE));
|
||||||
|
assertEquals(10_000, details.get(Timing.ENQUEUE, TimeUnit.NANOSECONDS));
|
||||||
|
|
||||||
|
details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
|
||||||
|
details.add(Timing.QUEUE, 20, TimeUnit.MICROSECONDS);
|
||||||
|
assertEquals(20_020, details.get(Timing.QUEUE));
|
||||||
|
assertEquals(0, details.get(Timing.QUEUE, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToString() {
|
||||||
|
ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
|
||||||
|
details.set(Timing.ENQUEUE, 10);
|
||||||
|
details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
assertEquals("enqueueTime=10 queueTime=20000 handlerTime=0 " +
|
||||||
|
"processingTime=0 lockfreeTime=0 lockwaitTime=0 locksharedTime=0 " +
|
||||||
|
"lockexclusiveTime=0 responseTime=0", details.toString());
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto
|
||||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -41,6 +42,7 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
@ -215,7 +217,8 @@ public class TestProtoBufRpc extends TestRpcBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 12000)
|
@Test(timeout = 12000)
|
||||||
public void testLogSlowRPC() throws IOException, ServiceException {
|
public void testLogSlowRPC() throws IOException, ServiceException,
|
||||||
|
TimeoutException, InterruptedException {
|
||||||
TestRpcService2 client = getClient2();
|
TestRpcService2 client = getClient2();
|
||||||
// make 10 K fast calls
|
// make 10 K fast calls
|
||||||
for (int x = 0; x < 10000; x++) {
|
for (int x = 0; x < 10000; x++) {
|
||||||
|
@ -234,9 +237,9 @@ public class TestProtoBufRpc extends TestRpcBase {
|
||||||
// make a really slow call. Sleep sleeps for 1000ms
|
// make a really slow call. Sleep sleeps for 1000ms
|
||||||
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
|
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
|
||||||
|
|
||||||
long after = rpcMetrics.getRpcSlowCalls();
|
|
||||||
// Ensure slow call is logged.
|
// Ensure slow call is logged.
|
||||||
Assert.assertEquals(before + 1L, after);
|
GenericTestUtils.waitFor(()
|
||||||
|
-> rpcMetrics.getRpcSlowCalls() == before + 1L, 10, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 12000)
|
@Test(timeout = 12000)
|
||||||
|
|
|
@ -87,6 +87,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -1072,10 +1074,14 @@ public class TestRPC extends TestRpcBase {
|
||||||
}
|
}
|
||||||
MetricsRecordBuilder rpcMetrics =
|
MetricsRecordBuilder rpcMetrics =
|
||||||
getMetrics(server.getRpcMetrics().name());
|
getMetrics(server.getRpcMetrics().name());
|
||||||
assertTrue("Expected non-zero rpc queue time",
|
assertEquals("Expected correct rpc queue count",
|
||||||
getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
|
3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
|
||||||
assertTrue("Expected non-zero rpc processing time",
|
assertEquals("Expected correct rpc processing count",
|
||||||
getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
|
3000, getLongCounter("RpcProcessingTimeNumOps", rpcMetrics));
|
||||||
|
assertEquals("Expected correct rpc lock wait count",
|
||||||
|
3000, getLongCounter("RpcLockWaitTimeNumOps", rpcMetrics));
|
||||||
|
assertEquals("Expected zero rpc lock wait time",
|
||||||
|
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
|
||||||
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
|
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
|
||||||
rpcMetrics);
|
rpcMetrics);
|
||||||
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
|
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
|
||||||
|
@ -1086,6 +1092,10 @@ public class TestRPC extends TestRpcBase {
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
|
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
|
||||||
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
|
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
|
||||||
|
|
||||||
|
proxy.lockAndSleep(null, newSleepRequest(5));
|
||||||
|
rpcMetrics = getMetrics(server.getRpcMetrics().name());
|
||||||
|
assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
|
||||||
} finally {
|
} finally {
|
||||||
if (proxy2 != null) {
|
if (proxy2 != null) {
|
||||||
RPC.stopProxy(proxy2);
|
RPC.stopProxy(proxy2);
|
||||||
|
|
|
@ -21,12 +21,16 @@ package org.apache.hadoop.ipc;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -278,6 +282,7 @@ public class TestRpcBase {
|
||||||
public static class PBServerImpl implements TestRpcService {
|
public static class PBServerImpl implements TestRpcService {
|
||||||
CountDownLatch fastPingCounter = new CountDownLatch(2);
|
CountDownLatch fastPingCounter = new CountDownLatch(2);
|
||||||
private List<Server.Call> postponedCalls = new ArrayList<>();
|
private List<Server.Call> postponedCalls = new ArrayList<>();
|
||||||
|
private final Lock lock = new ReentrantLock();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestProtos.EmptyResponseProto ping(RpcController unused,
|
public TestProtos.EmptyResponseProto ping(RpcController unused,
|
||||||
|
@ -388,6 +393,29 @@ public class TestRpcBase {
|
||||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto lockAndSleep(
|
||||||
|
RpcController controller, TestProtos.SleepRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
ProcessingDetails details =
|
||||||
|
Server.getCurCall().get().getProcessingDetails();
|
||||||
|
lock.lock();
|
||||||
|
long startNanos = Time.monotonicNowNanos();
|
||||||
|
try {
|
||||||
|
Thread.sleep(request.getMilliSeconds());
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
// ignore
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
// Add some arbitrary large lock wait time since in any test scenario
|
||||||
|
// the lock wait time will probably actually be too small to notice
|
||||||
|
details.add(ProcessingDetails.Timing.LOCKWAIT, 10, TimeUnit.SECONDS);
|
||||||
|
details.add(ProcessingDetails.Timing.LOCKEXCLUSIVE,
|
||||||
|
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestProtos.AuthMethodResponseProto getAuthMethod(
|
public TestProtos.AuthMethodResponseProto getAuthMethod(
|
||||||
RpcController controller, TestProtos.EmptyRequestProto request)
|
RpcController controller, TestProtos.EmptyRequestProto request)
|
||||||
|
|
|
@ -39,6 +39,7 @@ service TestProtobufRpcProto {
|
||||||
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
|
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
|
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
|
||||||
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
|
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
|
||||||
|
rpc lockAndSleep(SleepRequestProto) returns (EmptyResponseProto);
|
||||||
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
|
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
|
||||||
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
|
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
|
||||||
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
|
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -41,6 +42,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORT
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
||||||
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
||||||
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -142,17 +144,11 @@ class FSNamesystemLock {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readLock() {
|
public void readLock() {
|
||||||
coarseLock.readLock().lock();
|
doLock(false);
|
||||||
if (coarseLock.getReadHoldCount() == 1) {
|
|
||||||
readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readLockInterruptibly() throws InterruptedException {
|
public void readLockInterruptibly() throws InterruptedException {
|
||||||
coarseLock.readLock().lockInterruptibly();
|
doLockInterruptibly(false);
|
||||||
if (coarseLock.getReadHoldCount() == 1) {
|
|
||||||
readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readUnlock() {
|
public void readUnlock() {
|
||||||
|
@ -204,17 +200,11 @@ class FSNamesystemLock {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeLock() {
|
public void writeLock() {
|
||||||
coarseLock.writeLock().lock();
|
doLock(true);
|
||||||
if (coarseLock.getWriteHoldCount() == 1) {
|
|
||||||
writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeLockInterruptibly() throws InterruptedException {
|
public void writeLockInterruptibly() throws InterruptedException {
|
||||||
coarseLock.writeLock().lockInterruptibly();
|
doLockInterruptibly(true);
|
||||||
if (coarseLock.getWriteHoldCount() == 1) {
|
|
||||||
writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -316,6 +306,50 @@ class FSNamesystemLock {
|
||||||
String overallMetric = getMetricName(OVERALL_METRIC_NAME, isWrite);
|
String overallMetric = getMetricName(OVERALL_METRIC_NAME, isWrite);
|
||||||
detailedHoldTimeMetrics.add(overallMetric, value);
|
detailedHoldTimeMetrics.add(overallMetric, value);
|
||||||
}
|
}
|
||||||
|
updateProcessingDetails(
|
||||||
|
isWrite ? Timing.LOCKEXCLUSIVE : Timing.LOCKSHARED, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doLock(boolean isWrite) {
|
||||||
|
long startNanos = timer.monotonicNowNanos();
|
||||||
|
if (isWrite) {
|
||||||
|
coarseLock.writeLock().lock();
|
||||||
|
} else {
|
||||||
|
coarseLock.readLock().lock();
|
||||||
|
}
|
||||||
|
updateLockWait(startNanos, isWrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doLockInterruptibly(boolean isWrite)
|
||||||
|
throws InterruptedException {
|
||||||
|
long startNanos = timer.monotonicNowNanos();
|
||||||
|
if (isWrite) {
|
||||||
|
coarseLock.writeLock().lockInterruptibly();
|
||||||
|
} else {
|
||||||
|
coarseLock.readLock().lockInterruptibly();
|
||||||
|
}
|
||||||
|
updateLockWait(startNanos, isWrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateLockWait(long startNanos, boolean isWrite) {
|
||||||
|
long now = timer.monotonicNowNanos();
|
||||||
|
updateProcessingDetails(Timing.LOCKWAIT, now - startNanos);
|
||||||
|
if (isWrite) {
|
||||||
|
if (coarseLock.getWriteHoldCount() == 1) {
|
||||||
|
writeLockHeldTimeStampNanos = now;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (coarseLock.getReadHoldCount() == 1) {
|
||||||
|
readLockHeldTimeStampNanos.set(now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void updateProcessingDetails(Timing type, long deltaNanos) {
|
||||||
|
Server.Call call = Server.getCurCall().get();
|
||||||
|
if (call != null) {
|
||||||
|
call.getProcessingDetails().add(type, deltaNanos, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getMetricName(String operationName, boolean isWrite) {
|
private static String getMetricName(String operationName, boolean isWrite) {
|
||||||
|
|
|
@ -373,11 +373,6 @@ public class TestConsistentReadsObserver {
|
||||||
return --allowed < 0;
|
return --allowed < 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addResponseTime(String name, int priorityLevel, int queueTime,
|
|
||||||
int processingTime) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue