HADOOP-15726. Create utility to limit frequency of log statements. Contributed by Erik Krogen.
This commit is contained in:
parent
ffef819ee8
commit
78e0c0a1bb
|
@ -0,0 +1,358 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.log;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
|
||||||
|
import org.apache.hadoop.util.Timer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a class to help easily throttle log statements, so that they will
|
||||||
|
* not be emitted more frequently than a certain rate. It is useful to help
|
||||||
|
* prevent flooding the application logs with redundant messages.
|
||||||
|
*
|
||||||
|
* The instantiator specifies a minimum period at which statements should be
|
||||||
|
* logged. When {@link #record(double...)} is called, if enough time has elapsed
|
||||||
|
* since the last time it was called, the return value will indicate to the
|
||||||
|
* caller that it should write to its actual log. Note that this class does not
|
||||||
|
* write to any actual log; it only records information about how many times
|
||||||
|
* {@code record} has been called and with what arguments, and indicates to the
|
||||||
|
* caller whether or not it should write to its log. If not enough time has yet
|
||||||
|
* elapsed, this class records the arguments and updates its summary
|
||||||
|
* information, and indicates to the caller that it should not log.
|
||||||
|
*
|
||||||
|
* For example, say that you want to know whenever too large of a request is
|
||||||
|
* received, but want to avoid flooding the logs if many such requests are
|
||||||
|
* received.
|
||||||
|
* <pre>{@code
|
||||||
|
* // Helper with a minimum period of 5 seconds
|
||||||
|
* private LogThrottlingHelper helper = new LogThrottlingHelper(5000);
|
||||||
|
*
|
||||||
|
* public void receiveRequest(int requestedObjects) {
|
||||||
|
* if (requestedObjects > MAXIMUM_REQUEST_SIZE) {
|
||||||
|
* LogAction logAction = helper.record(requestedObjects);
|
||||||
|
* if (logAction.shouldLog()) {
|
||||||
|
* LOG.warn("Received {} large request(s) with a total of {} objects " +
|
||||||
|
* "requested; maximum objects requested was {}",
|
||||||
|
* logAction.getCount(), logAction.getStats(0).getSum(),
|
||||||
|
* logAction.getStats(0).getMax());
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
* The above snippet allows you to record extraneous events, but if they become
|
||||||
|
* frequent, to limit their presence in the log to only every 5 seconds while
|
||||||
|
* still maintaining overall information about how many large requests were
|
||||||
|
* received.
|
||||||
|
*
|
||||||
|
* <p/>This class can also be used to coordinate multiple logging points; see
|
||||||
|
* {@link #record(String, long, double...)} for more details.
|
||||||
|
*
|
||||||
|
* <p/>This class is not thread-safe.
|
||||||
|
*/
|
||||||
|
public class LogThrottlingHelper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An indication of what action the caller should take. If
|
||||||
|
* {@link #shouldLog()} is false, no other action should be taken, and it is
|
||||||
|
* an error to try to access any of the summary information. If
|
||||||
|
* {@link #shouldLog()} is true, then the caller should write to its log, and
|
||||||
|
* can use the {@link #getCount()} and {@link #getStats(int)} methods to
|
||||||
|
* determine summary information about what has been recorded into this
|
||||||
|
* helper.
|
||||||
|
*
|
||||||
|
* All summary information in this action only represents
|
||||||
|
* {@link #record(double...)} statements which were called <i>after</i> the
|
||||||
|
* last time the caller logged something; that is, since the last time a log
|
||||||
|
* action was returned with a true value for {@link #shouldLog()}. Information
|
||||||
|
* about the {@link #record(double...)} statement which created this log
|
||||||
|
* action is included.
|
||||||
|
*/
|
||||||
|
public interface LogAction {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of records encapsulated in this action; that is, the
|
||||||
|
* number of times {@code record} was called to produce this action,
|
||||||
|
* including the current one.
|
||||||
|
*/
|
||||||
|
int getCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return summary information for the value that was recorded at index
|
||||||
|
* {@code idx}. Corresponds to the ordering of values passed to
|
||||||
|
* {@link #record(double...)}.
|
||||||
|
*/
|
||||||
|
SummaryStatistics getStats(int idx);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this is true, the caller should write to its log. Otherwise, the
|
||||||
|
* caller should take no action, and it is an error to call other methods
|
||||||
|
* on this object.
|
||||||
|
*/
|
||||||
|
boolean shouldLog();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link LogAction} representing a state that should not yet be logged.
|
||||||
|
* If any attempt is made to extract information from this, it will throw
|
||||||
|
* an {@link IllegalStateException}.
|
||||||
|
*/
|
||||||
|
public static final LogAction DO_NOT_LOG = new NoLogAction();
|
||||||
|
private static final String DEFAULT_RECORDER_NAME =
|
||||||
|
"__DEFAULT_RECORDER_NAME__";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This throttler will not trigger log statements more frequently than this
|
||||||
|
* period.
|
||||||
|
*/
|
||||||
|
private final long minLogPeriodMs;
|
||||||
|
/**
|
||||||
|
* The name of the recorder treated as the primary; this is the only one which
|
||||||
|
* will trigger logging. Other recorders are dependent on the state of this
|
||||||
|
* recorder. This may be null, in which case a primary has not yet been set.
|
||||||
|
*/
|
||||||
|
private String primaryRecorderName;
|
||||||
|
private final Timer timer;
|
||||||
|
private final Map<String, LoggingAction> currentLogs;
|
||||||
|
|
||||||
|
private long lastLogTimestampMs = Long.MIN_VALUE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a log helper without any primary recorder.
|
||||||
|
*
|
||||||
|
* @see #LogThrottlingHelper(long, String)
|
||||||
|
*/
|
||||||
|
public LogThrottlingHelper(long minLogPeriodMs) {
|
||||||
|
this(minLogPeriodMs, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a log helper with a specified primary recorder name; this can be
|
||||||
|
* used in conjunction with {@link #record(String, long, double...)} to set up
|
||||||
|
* primary and dependent recorders. See
|
||||||
|
* {@link #record(String, long, double...)} for more details.
|
||||||
|
*
|
||||||
|
* @param minLogPeriodMs The minimum period with which to log; do not log
|
||||||
|
* more frequently than this.
|
||||||
|
* @param primaryRecorderName The name of the primary recorder.
|
||||||
|
*/
|
||||||
|
public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) {
|
||||||
|
this(minLogPeriodMs, primaryRecorderName, new Timer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName,
|
||||||
|
Timer timer) {
|
||||||
|
this.minLogPeriodMs = minLogPeriodMs;
|
||||||
|
this.primaryRecorderName = primaryRecorderName;
|
||||||
|
this.timer = timer;
|
||||||
|
this.currentLogs = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record some set of values at the current time into this helper. Note that
|
||||||
|
* this does <i>not</i> actually write information to any log. Instead, this
|
||||||
|
* will return a LogAction indicating whether or not the caller should write
|
||||||
|
* to its own log. The LogAction will additionally contain summary information
|
||||||
|
* about the values specified since the last time the caller was expected to
|
||||||
|
* write to its log.
|
||||||
|
*
|
||||||
|
* <p/>Specifying multiple values will maintain separate summary statistics
|
||||||
|
* about each value. For example:
|
||||||
|
* <pre>{@code
|
||||||
|
* helper.record(1, 0);
|
||||||
|
* LogAction action = helper.record(3, 100);
|
||||||
|
* action.getStats(0); // == 2
|
||||||
|
* action.getStats(1); // == 50
|
||||||
|
* }</pre>
|
||||||
|
*
|
||||||
|
* @param values The values about which to maintain summary information. Every
|
||||||
|
* time this method is called, the same number of values must
|
||||||
|
* be specified.
|
||||||
|
* @return A LogAction indicating whether or not the caller should write to
|
||||||
|
* its log.
|
||||||
|
*/
|
||||||
|
public LogAction record(double... values) {
|
||||||
|
return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record some set of values at the specified time into this helper. This can
|
||||||
|
* be useful to avoid fetching the current time twice if the caller has
|
||||||
|
* already done so for other purposes. This additionally allows the caller to
|
||||||
|
* specify a name for this recorder. When multiple names are used, one is
|
||||||
|
* denoted as the primary recorder. Only recorders named as the primary
|
||||||
|
* will trigger logging; other names not matching the primary can <i>only</i>
|
||||||
|
* be triggered by following the primary. This is used to coordinate multiple
|
||||||
|
* logging points. A primary can be set via the
|
||||||
|
* {@link #LogThrottlingHelper(long, String)} constructor. If no primary
|
||||||
|
* is set in the constructor, then the first recorder name used becomes the
|
||||||
|
* primary.
|
||||||
|
*
|
||||||
|
* If multiple names are used, they maintain entirely different sets of values
|
||||||
|
* and summary information. For example:
|
||||||
|
* <pre>{@code
|
||||||
|
* // Initialize "pre" as the primary recorder name
|
||||||
|
* LogThrottlingHelper helper = new LogThrottlingHelper(1000, "pre");
|
||||||
|
* LogAction preLog = helper.record("pre", Time.monotonicNow());
|
||||||
|
* if (preLog.shouldLog()) {
|
||||||
|
* // ...
|
||||||
|
* }
|
||||||
|
* double eventsProcessed = ... // perform some action
|
||||||
|
* LogAction postLog =
|
||||||
|
* helper.record("post", Time.monotonicNow(), eventsProcessed);
|
||||||
|
* if (postLog.shouldLog()) {
|
||||||
|
* // ...
|
||||||
|
* // Can use postLog.getStats(0) to access eventsProcessed information
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
* Since "pre" is the primary recorder name, logging to "pre" will trigger a
|
||||||
|
* log action if enough time has elapsed. This will indicate that "post"
|
||||||
|
* should log as well. This ensures that "post" is always logged in the same
|
||||||
|
* iteration as "pre", yet each one is able to maintain its own summary
|
||||||
|
* information.
|
||||||
|
*
|
||||||
|
* <p/>Other behavior is the same as {@link #record(double...)}.
|
||||||
|
*
|
||||||
|
* @param recorderName The name of the recorder. This is used to check if the
|
||||||
|
* current recorder is the primary. Other names are
|
||||||
|
* arbitrary and are only used to differentiate between
|
||||||
|
* distinct recorders.
|
||||||
|
* @param currentTimeMs The current time.
|
||||||
|
* @param values The values to log.
|
||||||
|
* @return The LogAction for the specified recorder.
|
||||||
|
*
|
||||||
|
* @see #record(double...)
|
||||||
|
*/
|
||||||
|
public LogAction record(String recorderName, long currentTimeMs,
|
||||||
|
double... values) {
|
||||||
|
if (primaryRecorderName == null) {
|
||||||
|
primaryRecorderName = recorderName;
|
||||||
|
}
|
||||||
|
LoggingAction currentLog = currentLogs.get(recorderName);
|
||||||
|
if (currentLog == null || currentLog.hasLogged()) {
|
||||||
|
currentLog = new LoggingAction(values.length);
|
||||||
|
if (!currentLogs.containsKey(recorderName)) {
|
||||||
|
// Always log newly created loggers
|
||||||
|
currentLog.setShouldLog();
|
||||||
|
}
|
||||||
|
currentLogs.put(recorderName, currentLog);
|
||||||
|
}
|
||||||
|
currentLog.recordValues(values);
|
||||||
|
if (primaryRecorderName.equals(recorderName) &&
|
||||||
|
currentTimeMs - minLogPeriodMs >= lastLogTimestampMs) {
|
||||||
|
lastLogTimestampMs = currentTimeMs;
|
||||||
|
for (LoggingAction log : currentLogs.values()) {
|
||||||
|
log.setShouldLog();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (currentLog.shouldLog()) {
|
||||||
|
currentLog.setHasLogged();
|
||||||
|
return currentLog;
|
||||||
|
} else {
|
||||||
|
return DO_NOT_LOG;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A standard log action which keeps track of all of the values which have
|
||||||
|
* been logged. This is also used for internal bookkeeping via its private
|
||||||
|
* fields and methods; it will maintain whether or not it is ready to be
|
||||||
|
* logged ({@link #shouldLog()}) as well as whether or not it has been
|
||||||
|
* returned for logging yet ({@link #hasLogged()}).
|
||||||
|
*/
|
||||||
|
private static class LoggingAction implements LogAction {
|
||||||
|
|
||||||
|
private int count = 0;
|
||||||
|
private final SummaryStatistics[] stats;
|
||||||
|
private boolean shouldLog = false;
|
||||||
|
private boolean hasLogged = false;
|
||||||
|
|
||||||
|
LoggingAction(int valueCount) {
|
||||||
|
stats = new SummaryStatistics[valueCount];
|
||||||
|
for (int i = 0; i < stats.length; i++) {
|
||||||
|
stats[i] = new SummaryStatistics();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SummaryStatistics getStats(int idx) {
|
||||||
|
if (idx < 0 || idx >= stats.length) {
|
||||||
|
throw new IllegalArgumentException("Requested stats at idx " + idx +
|
||||||
|
" but this log only maintains " + stats.length + " stats");
|
||||||
|
}
|
||||||
|
return stats[idx];
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldLog() {
|
||||||
|
return shouldLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setShouldLog() {
|
||||||
|
shouldLog = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasLogged() {
|
||||||
|
return hasLogged;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setHasLogged() {
|
||||||
|
hasLogged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recordValues(double... values) {
|
||||||
|
if (values.length != stats.length) {
|
||||||
|
throw new IllegalArgumentException("received " + values.length +
|
||||||
|
" values but expected " + stats.length);
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
for (int i = 0; i < values.length; i++) {
|
||||||
|
stats[i].addValue(values[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A non-logging action.
|
||||||
|
*
|
||||||
|
* @see #DO_NOT_LOG
|
||||||
|
*/
|
||||||
|
private static class NoLogAction implements LogAction {
|
||||||
|
|
||||||
|
public int getCount() {
|
||||||
|
throw new IllegalStateException("Cannot be logged yet!");
|
||||||
|
}
|
||||||
|
|
||||||
|
public SummaryStatistics getStats(int idx) {
|
||||||
|
throw new IllegalStateException("Cannot be logged yet!");
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldLog() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,172 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.log;
|
||||||
|
|
||||||
|
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||||
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link LogThrottlingHelper}.
|
||||||
|
*/
|
||||||
|
public class TestLogThrottlingHelper {
|
||||||
|
|
||||||
|
private static final int LOG_PERIOD = 100;
|
||||||
|
|
||||||
|
private LogThrottlingHelper helper;
|
||||||
|
private FakeTimer timer;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
timer = new FakeTimer();
|
||||||
|
helper = new LogThrottlingHelper(LOG_PERIOD, null, timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicLogging() {
|
||||||
|
assertTrue(helper.record().shouldLog());
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
timer.advance(LOG_PERIOD / 10);
|
||||||
|
assertFalse(helper.record().shouldLog());
|
||||||
|
}
|
||||||
|
timer.advance(LOG_PERIOD);
|
||||||
|
assertTrue(helper.record().shouldLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoggingWithValue() {
|
||||||
|
assertTrue(helper.record(1).shouldLog());
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
timer.advance(LOG_PERIOD / 5);
|
||||||
|
assertFalse(helper.record(i % 2 == 0 ? 0 : 1).shouldLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
timer.advance(LOG_PERIOD);
|
||||||
|
LogAction action = helper.record(0.5);
|
||||||
|
assertTrue(action.shouldLog());
|
||||||
|
assertEquals(5, action.getCount());
|
||||||
|
assertEquals(0.5, action.getStats(0).getMean(), 0.01);
|
||||||
|
assertEquals(1.0, action.getStats(0).getMax(), 0.01);
|
||||||
|
assertEquals(0.0, action.getStats(0).getMin(), 0.01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoggingWithMultipleValues() {
|
||||||
|
assertTrue(helper.record(1).shouldLog());
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
timer.advance(LOG_PERIOD / 5);
|
||||||
|
int base = i % 2 == 0 ? 0 : 1;
|
||||||
|
assertFalse(helper.record(base, base * 2).shouldLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
timer.advance(LOG_PERIOD);
|
||||||
|
LogAction action = helper.record(0.5, 1.0);
|
||||||
|
assertTrue(action.shouldLog());
|
||||||
|
assertEquals(5, action.getCount());
|
||||||
|
for (int i = 1; i <= 2; i++) {
|
||||||
|
assertEquals(0.5 * i, action.getStats(i - 1).getMean(), 0.01);
|
||||||
|
assertEquals(1.0 * i, action.getStats(i - 1).getMax(), 0.01);
|
||||||
|
assertEquals(0.0, action.getStats(i - 1).getMin(), 0.01);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testLoggingWithInconsistentValues() {
|
||||||
|
assertTrue(helper.record(1, 2).shouldLog());
|
||||||
|
helper.record(1, 2);
|
||||||
|
helper.record(1, 2, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNamedLoggersWithoutSpecifiedPrimary() {
|
||||||
|
assertTrue(helper.record("foo", 0).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", 0).shouldLog());
|
||||||
|
|
||||||
|
assertFalse(helper.record("foo", LOG_PERIOD / 2).shouldLog());
|
||||||
|
assertFalse(helper.record("bar", LOG_PERIOD / 2).shouldLog());
|
||||||
|
|
||||||
|
assertTrue(helper.record("foo", LOG_PERIOD).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", LOG_PERIOD).shouldLog());
|
||||||
|
|
||||||
|
assertFalse(helper.record("foo", (LOG_PERIOD * 3) / 2).shouldLog());
|
||||||
|
assertFalse(helper.record("bar", (LOG_PERIOD * 3) / 2).shouldLog());
|
||||||
|
|
||||||
|
assertFalse(helper.record("bar", LOG_PERIOD * 2).shouldLog());
|
||||||
|
assertTrue(helper.record("foo", LOG_PERIOD * 2).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", LOG_PERIOD * 2).shouldLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrimaryAndDependentLoggers() {
|
||||||
|
helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer);
|
||||||
|
|
||||||
|
assertTrue(helper.record("foo", 0).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", 0).shouldLog());
|
||||||
|
assertFalse(helper.record("bar", 0).shouldLog());
|
||||||
|
assertFalse(helper.record("foo", 0).shouldLog());
|
||||||
|
|
||||||
|
assertFalse(helper.record("foo", LOG_PERIOD / 2).shouldLog());
|
||||||
|
assertFalse(helper.record("bar", LOG_PERIOD / 2).shouldLog());
|
||||||
|
|
||||||
|
// Both should log once the period has elapsed
|
||||||
|
assertTrue(helper.record("foo", LOG_PERIOD).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", LOG_PERIOD).shouldLog());
|
||||||
|
|
||||||
|
// "bar" should not log yet because "foo" hasn't been triggered
|
||||||
|
assertFalse(helper.record("bar", LOG_PERIOD * 2).shouldLog());
|
||||||
|
assertTrue(helper.record("foo", LOG_PERIOD * 2).shouldLog());
|
||||||
|
// The timing of "bar" shouldn't matter as it is dependent on "foo"
|
||||||
|
assertTrue(helper.record("bar", 0).shouldLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleLoggersWithValues() {
|
||||||
|
helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer);
|
||||||
|
|
||||||
|
assertTrue(helper.record("foo", 0).shouldLog());
|
||||||
|
assertTrue(helper.record("bar", 0, 2).shouldLog());
|
||||||
|
assertTrue(helper.record("baz", 0, 3, 3).shouldLog());
|
||||||
|
|
||||||
|
// "bar"/"baz" should not log yet because "foo" hasn't been triggered
|
||||||
|
assertFalse(helper.record("bar", LOG_PERIOD, 2).shouldLog());
|
||||||
|
assertFalse(helper.record("baz", LOG_PERIOD, 3, 3).shouldLog());
|
||||||
|
|
||||||
|
// All should log once the period has elapsed
|
||||||
|
LogAction foo = helper.record("foo", LOG_PERIOD);
|
||||||
|
LogAction bar = helper.record("bar", LOG_PERIOD, 2);
|
||||||
|
LogAction baz = helper.record("baz", LOG_PERIOD, 3, 3);
|
||||||
|
assertTrue(foo.shouldLog());
|
||||||
|
assertTrue(bar.shouldLog());
|
||||||
|
assertTrue(baz.shouldLog());
|
||||||
|
assertEquals(1, foo.getCount());
|
||||||
|
assertEquals(2, bar.getCount());
|
||||||
|
assertEquals(2, baz.getCount());
|
||||||
|
assertEquals(2.0, bar.getStats(0).getMean(), 0.01);
|
||||||
|
assertEquals(3.0, baz.getStats(0).getMean(), 0.01);
|
||||||
|
assertEquals(3.0, baz.getStats(1).getMean(), 0.01);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.Timer;
|
||||||
|
@ -40,6 +41,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORT
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
||||||
|
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mimics a ReentrantReadWriteLock but does not directly implement the interface
|
* Mimics a ReentrantReadWriteLock but does not directly implement the interface
|
||||||
|
@ -74,11 +76,8 @@ class FSNamesystemLock {
|
||||||
private final long writeLockReportingThresholdMs;
|
private final long writeLockReportingThresholdMs;
|
||||||
/** Last time stamp for write lock. Keep the longest one for multi-entrance.*/
|
/** Last time stamp for write lock. Keep the longest one for multi-entrance.*/
|
||||||
private long writeLockHeldTimeStampNanos;
|
private long writeLockHeldTimeStampNanos;
|
||||||
private int numWriteLockWarningsSuppressed = 0;
|
/** Frequency limiter used for reporting long write lock hold times. */
|
||||||
/** Time stamp (ms) of the last time a write lock report was written. */
|
private final LogThrottlingHelper writeLockReportLogger;
|
||||||
private long timeStampOfLastWriteLockReportMs = 0;
|
|
||||||
/** Longest time (ms) a write lock was held since the last report. */
|
|
||||||
private long longestWriteLockHeldIntervalMs = 0;
|
|
||||||
|
|
||||||
/** Threshold (ms) for long holding read lock report. */
|
/** Threshold (ms) for long holding read lock report. */
|
||||||
private final long readLockReportingThresholdMs;
|
private final long readLockReportingThresholdMs;
|
||||||
|
@ -132,6 +131,8 @@ class FSNamesystemLock {
|
||||||
this.lockSuppressWarningIntervalMs = conf.getTimeDuration(
|
this.lockSuppressWarningIntervalMs = conf.getTimeDuration(
|
||||||
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||||
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
|
this.writeLockReportLogger =
|
||||||
|
new LogThrottlingHelper(lockSuppressWarningIntervalMs);
|
||||||
this.metricsEnabled = conf.getBoolean(
|
this.metricsEnabled = conf.getBoolean(
|
||||||
DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
|
DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
|
||||||
DFS_NAMENODE_LOCK_DETAILED_METRICS_DEFAULT);
|
DFS_NAMENODE_LOCK_DETAILED_METRICS_DEFAULT);
|
||||||
|
@ -230,25 +231,11 @@ class FSNamesystemLock {
|
||||||
final long writeLockIntervalMs =
|
final long writeLockIntervalMs =
|
||||||
TimeUnit.NANOSECONDS.toMillis(writeLockIntervalNanos);
|
TimeUnit.NANOSECONDS.toMillis(writeLockIntervalNanos);
|
||||||
|
|
||||||
boolean logReport = false;
|
LogAction logAction = LogThrottlingHelper.DO_NOT_LOG;
|
||||||
int numSuppressedWarnings = 0;
|
|
||||||
long longestLockHeldIntervalMs = 0;
|
|
||||||
if (needReport &&
|
if (needReport &&
|
||||||
writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
|
writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
|
||||||
if (writeLockIntervalMs > longestWriteLockHeldIntervalMs) {
|
logAction = writeLockReportLogger
|
||||||
longestWriteLockHeldIntervalMs = writeLockIntervalMs;
|
.record("write", currentTimeMs, writeLockIntervalMs);
|
||||||
}
|
|
||||||
if (currentTimeMs - timeStampOfLastWriteLockReportMs >
|
|
||||||
this.lockSuppressWarningIntervalMs) {
|
|
||||||
logReport = true;
|
|
||||||
numSuppressedWarnings = numWriteLockWarningsSuppressed;
|
|
||||||
numWriteLockWarningsSuppressed = 0;
|
|
||||||
longestLockHeldIntervalMs = longestWriteLockHeldIntervalMs;
|
|
||||||
longestWriteLockHeldIntervalMs = 0;
|
|
||||||
timeStampOfLastWriteLockReportMs = currentTimeMs;
|
|
||||||
} else {
|
|
||||||
numWriteLockWarningsSuppressed++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
coarseLock.writeLock().unlock();
|
coarseLock.writeLock().unlock();
|
||||||
|
@ -257,13 +244,16 @@ class FSNamesystemLock {
|
||||||
addMetric(opName, writeLockIntervalNanos, true);
|
addMetric(opName, writeLockIntervalNanos, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logReport) {
|
if (logAction.shouldLog()) {
|
||||||
FSNamesystem.LOG.info("FSNamesystem write lock held for " +
|
FSNamesystem.LOG.info("FSNamesystem write lock held for " +
|
||||||
writeLockIntervalMs + " ms via\n" +
|
writeLockIntervalMs + " ms via " +
|
||||||
StringUtils.getStackTrace(Thread.currentThread()) +
|
StringUtils.getStackTrace(Thread.currentThread()) +
|
||||||
"\tNumber of suppressed write-lock reports: " +
|
"\tNumber of suppressed write-lock reports: " +
|
||||||
numSuppressedWarnings + "\n\tLongest write-lock held interval: " +
|
(logAction.getCount() - 1) +
|
||||||
longestLockHeldIntervalMs);
|
"\n\tLongest write-lock held interval: " +
|
||||||
|
logAction.getStats(0).getMax() +
|
||||||
|
"\n\tTotal suppressed write-lock held time: " +
|
||||||
|
(logAction.getStats(0).getSum() - writeLockIntervalMs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue