diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java new file mode 100644 index 00000000000..aa4e61c570b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.log; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; +import org.apache.hadoop.util.Timer; + +/** + * This is a class to help easily throttle log statements, so that they will + * not be emitted more frequently than a certain rate. It is useful to help + * prevent flooding the application logs with redundant messages. + * + * The instantiator specifies a minimum period at which statements should be + * logged. When {@link #record(double...)} is called, if enough time has elapsed + * since the last time it was called, the return value will indicate to the + * caller that it should write to its actual log. Note that this class does not + * write to any actual log; it only records information about how many times + * {@code record} has been called and with what arguments, and indicates to the + * caller whether or not it should write to its log. If not enough time has yet + * elapsed, this class records the arguments and updates its summary + * information, and indicates to the caller that it should not log. + * + * For example, say that you want to know whenever too large of a request is + * received, but want to avoid flooding the logs if many such requests are + * received. + *
{@code + * // Helper with a minimum period of 5 seconds + * private LogThrottlingHelper helper = new LogThrottlingHelper(5000); + * + * public void receiveRequest(int requestedObjects) { + * if (requestedObjects > MAXIMUM_REQUEST_SIZE) { + * LogAction logAction = helper.record(requestedObjects); + * if (logAction.shouldLog()) { + * LOG.warn("Received {} large request(s) with a total of {} objects " + + * "requested; maximum objects requested was {}", + * logAction.getCount(), logAction.getStats(0).getSum(), + * logAction.getStats(0).getMax()); + * } + * } + * } + * }+ * The above snippet allows you to record extraneous events, but if they become + * frequent, to limit their presence in the log to only every 5 seconds while + * still maintaining overall information about how many large requests were + * received. + * + * This class can also be used to coordinate multiple logging points; see + * {@link #record(String, long, double...)} for more details. + * + * This class is not thread-safe. + */ +public class LogThrottlingHelper { + + /** + * An indication of what action the caller should take. If + * {@link #shouldLog()} is false, no other action should be taken, and it is + * an error to try to access any of the summary information. If + * {@link #shouldLog()} is true, then the caller should write to its log, and + * can use the {@link #getCount()} and {@link #getStats(int)} methods to + * determine summary information about what has been recorded into this + * helper. + * + * All summary information in this action only represents + * {@link #record(double...)} statements which were called after the + * last time the caller logged something; that is, since the last time a log + * action was returned with a true value for {@link #shouldLog()}. Information + * about the {@link #record(double...)} statement which created this log + * action is included. + */ + public interface LogAction { + + /** + * Return the number of records encapsulated in this action; that is, the + * number of times {@code record} was called to produce this action, + * including the current one. + */ + int getCount(); + + /** + * Return summary information for the value that was recorded at index + * {@code idx}. Corresponds to the ordering of values passed to + * {@link #record(double...)}. + */ + SummaryStatistics getStats(int idx); + + /** + * If this is true, the caller should write to its log. Otherwise, the + * caller should take no action, and it is an error to call other methods + * on this object. + */ + boolean shouldLog(); + + } + + /** + * A {@link LogAction} representing a state that should not yet be logged. + * If any attempt is made to extract information from this, it will throw + * an {@link IllegalStateException}. + */ + public static final LogAction DO_NOT_LOG = new NoLogAction(); + private static final String DEFAULT_RECORDER_NAME = + "__DEFAULT_RECORDER_NAME__"; + + /** + * This throttler will not trigger log statements more frequently than this + * period. + */ + private final long minLogPeriodMs; + /** + * The name of the recorder treated as the primary; this is the only one which + * will trigger logging. Other recorders are dependent on the state of this + * recorder. This may be null, in which case a primary has not yet been set. + */ + private String primaryRecorderName; + private final Timer timer; + private final Map
{@code + * helper.record(1, 0); + * LogAction action = helper.record(3, 100); + * action.getStats(0); // == 2 + * action.getStats(1); // == 50 + * }+ * + * @param values The values about which to maintain summary information. Every + * time this method is called, the same number of values must + * be specified. + * @return A LogAction indicating whether or not the caller should write to + * its log. + */ + public LogAction record(double... values) { + return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values); + } + + /** + * Record some set of values at the specified time into this helper. This can + * be useful to avoid fetching the current time twice if the caller has + * already done so for other purposes. This additionally allows the caller to + * specify a name for this recorder. When multiple names are used, one is + * denoted as the primary recorder. Only recorders named as the primary + * will trigger logging; other names not matching the primary can only + * be triggered by following the primary. This is used to coordinate multiple + * logging points. A primary can be set via the + * {@link #LogThrottlingHelper(long, String)} constructor. If no primary + * is set in the constructor, then the first recorder name used becomes the + * primary. + * + * If multiple names are used, they maintain entirely different sets of values + * and summary information. For example: + *
{@code + * // Initialize "pre" as the primary recorder name + * LogThrottlingHelper helper = new LogThrottlingHelper(1000, "pre"); + * LogAction preLog = helper.record("pre", Time.monotonicNow()); + * if (preLog.shouldLog()) { + * // ... + * } + * double eventsProcessed = ... // perform some action + * LogAction postLog = + * helper.record("post", Time.monotonicNow(), eventsProcessed); + * if (postLog.shouldLog()) { + * // ... + * // Can use postLog.getStats(0) to access eventsProcessed information + * } + * }+ * Since "pre" is the primary recorder name, logging to "pre" will trigger a + * log action if enough time has elapsed. This will indicate that "post" + * should log as well. This ensures that "post" is always logged in the same + * iteration as "pre", yet each one is able to maintain its own summary + * information. + * + * Other behavior is the same as {@link #record(double...)}. + * + * @param recorderName The name of the recorder. This is used to check if the + * current recorder is the primary. Other names are + * arbitrary and are only used to differentiate between + * distinct recorders. + * @param currentTimeMs The current time. + * @param values The values to log. + * @return The LogAction for the specified recorder. + * + * @see #record(double...) + */ + public LogAction record(String recorderName, long currentTimeMs, + double... values) { + if (primaryRecorderName == null) { + primaryRecorderName = recorderName; + } + LoggingAction currentLog = currentLogs.get(recorderName); + if (currentLog == null || currentLog.hasLogged()) { + currentLog = new LoggingAction(values.length); + if (!currentLogs.containsKey(recorderName)) { + // Always log newly created loggers + currentLog.setShouldLog(); + } + currentLogs.put(recorderName, currentLog); + } + currentLog.recordValues(values); + if (primaryRecorderName.equals(recorderName) && + currentTimeMs - minLogPeriodMs >= lastLogTimestampMs) { + lastLogTimestampMs = currentTimeMs; + for (LoggingAction log : currentLogs.values()) { + log.setShouldLog(); + } + } + if (currentLog.shouldLog()) { + currentLog.setHasLogged(); + return currentLog; + } else { + return DO_NOT_LOG; + } + } + + /** + * A standard log action which keeps track of all of the values which have + * been logged. This is also used for internal bookkeeping via its private + * fields and methods; it will maintain whether or not it is ready to be + * logged ({@link #shouldLog()}) as well as whether or not it has been + * returned for logging yet ({@link #hasLogged()}). + */ + private static class LoggingAction implements LogAction { + + private int count = 0; + private final SummaryStatistics[] stats; + private boolean shouldLog = false; + private boolean hasLogged = false; + + LoggingAction(int valueCount) { + stats = new SummaryStatistics[valueCount]; + for (int i = 0; i < stats.length; i++) { + stats[i] = new SummaryStatistics(); + } + } + + public int getCount() { + return count; + } + + public SummaryStatistics getStats(int idx) { + if (idx < 0 || idx >= stats.length) { + throw new IllegalArgumentException("Requested stats at idx " + idx + + " but this log only maintains " + stats.length + " stats"); + } + return stats[idx]; + } + + public boolean shouldLog() { + return shouldLog; + } + + private void setShouldLog() { + shouldLog = true; + } + + private boolean hasLogged() { + return hasLogged; + } + + private void setHasLogged() { + hasLogged = true; + } + + private void recordValues(double... values) { + if (values.length != stats.length) { + throw new IllegalArgumentException("received " + values.length + + " values but expected " + stats.length); + } + count++; + for (int i = 0; i < values.length; i++) { + stats[i].addValue(values[i]); + } + } + + } + + /** + * A non-logging action. + * + * @see #DO_NOT_LOG + */ + private static class NoLogAction implements LogAction { + + public int getCount() { + throw new IllegalStateException("Cannot be logged yet!"); + } + + public SummaryStatistics getStats(int idx) { + throw new IllegalStateException("Cannot be logged yet!"); + } + + public boolean shouldLog() { + return false; + } + + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/log/TestLogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/log/TestLogThrottlingHelper.java new file mode 100644 index 00000000000..a675d0a589a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/log/TestLogThrottlingHelper.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.log; + +import org.apache.hadoop.log.LogThrottlingHelper.LogAction; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link LogThrottlingHelper}. + */ +public class TestLogThrottlingHelper { + + private static final int LOG_PERIOD = 100; + + private LogThrottlingHelper helper; + private FakeTimer timer; + + @Before + public void setup() { + timer = new FakeTimer(); + helper = new LogThrottlingHelper(LOG_PERIOD, null, timer); + } + + @Test + public void testBasicLogging() { + assertTrue(helper.record().shouldLog()); + + for (int i = 0; i < 5; i++) { + timer.advance(LOG_PERIOD / 10); + assertFalse(helper.record().shouldLog()); + } + timer.advance(LOG_PERIOD); + assertTrue(helper.record().shouldLog()); + } + + @Test + public void testLoggingWithValue() { + assertTrue(helper.record(1).shouldLog()); + + for (int i = 0; i < 4; i++) { + timer.advance(LOG_PERIOD / 5); + assertFalse(helper.record(i % 2 == 0 ? 0 : 1).shouldLog()); + } + + timer.advance(LOG_PERIOD); + LogAction action = helper.record(0.5); + assertTrue(action.shouldLog()); + assertEquals(5, action.getCount()); + assertEquals(0.5, action.getStats(0).getMean(), 0.01); + assertEquals(1.0, action.getStats(0).getMax(), 0.01); + assertEquals(0.0, action.getStats(0).getMin(), 0.01); + } + + @Test + public void testLoggingWithMultipleValues() { + assertTrue(helper.record(1).shouldLog()); + + for (int i = 0; i < 4; i++) { + timer.advance(LOG_PERIOD / 5); + int base = i % 2 == 0 ? 0 : 1; + assertFalse(helper.record(base, base * 2).shouldLog()); + } + + timer.advance(LOG_PERIOD); + LogAction action = helper.record(0.5, 1.0); + assertTrue(action.shouldLog()); + assertEquals(5, action.getCount()); + for (int i = 1; i <= 2; i++) { + assertEquals(0.5 * i, action.getStats(i - 1).getMean(), 0.01); + assertEquals(1.0 * i, action.getStats(i - 1).getMax(), 0.01); + assertEquals(0.0, action.getStats(i - 1).getMin(), 0.01); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testLoggingWithInconsistentValues() { + assertTrue(helper.record(1, 2).shouldLog()); + helper.record(1, 2); + helper.record(1, 2, 3); + } + + @Test + public void testNamedLoggersWithoutSpecifiedPrimary() { + assertTrue(helper.record("foo", 0).shouldLog()); + assertTrue(helper.record("bar", 0).shouldLog()); + + assertFalse(helper.record("foo", LOG_PERIOD / 2).shouldLog()); + assertFalse(helper.record("bar", LOG_PERIOD / 2).shouldLog()); + + assertTrue(helper.record("foo", LOG_PERIOD).shouldLog()); + assertTrue(helper.record("bar", LOG_PERIOD).shouldLog()); + + assertFalse(helper.record("foo", (LOG_PERIOD * 3) / 2).shouldLog()); + assertFalse(helper.record("bar", (LOG_PERIOD * 3) / 2).shouldLog()); + + assertFalse(helper.record("bar", LOG_PERIOD * 2).shouldLog()); + assertTrue(helper.record("foo", LOG_PERIOD * 2).shouldLog()); + assertTrue(helper.record("bar", LOG_PERIOD * 2).shouldLog()); + } + + @Test + public void testPrimaryAndDependentLoggers() { + helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer); + + assertTrue(helper.record("foo", 0).shouldLog()); + assertTrue(helper.record("bar", 0).shouldLog()); + assertFalse(helper.record("bar", 0).shouldLog()); + assertFalse(helper.record("foo", 0).shouldLog()); + + assertFalse(helper.record("foo", LOG_PERIOD / 2).shouldLog()); + assertFalse(helper.record("bar", LOG_PERIOD / 2).shouldLog()); + + // Both should log once the period has elapsed + assertTrue(helper.record("foo", LOG_PERIOD).shouldLog()); + assertTrue(helper.record("bar", LOG_PERIOD).shouldLog()); + + // "bar" should not log yet because "foo" hasn't been triggered + assertFalse(helper.record("bar", LOG_PERIOD * 2).shouldLog()); + assertTrue(helper.record("foo", LOG_PERIOD * 2).shouldLog()); + // The timing of "bar" shouldn't matter as it is dependent on "foo" + assertTrue(helper.record("bar", 0).shouldLog()); + } + + @Test + public void testMultipleLoggersWithValues() { + helper = new LogThrottlingHelper(LOG_PERIOD, "foo", timer); + + assertTrue(helper.record("foo", 0).shouldLog()); + assertTrue(helper.record("bar", 0, 2).shouldLog()); + assertTrue(helper.record("baz", 0, 3, 3).shouldLog()); + + // "bar"/"baz" should not log yet because "foo" hasn't been triggered + assertFalse(helper.record("bar", LOG_PERIOD, 2).shouldLog()); + assertFalse(helper.record("baz", LOG_PERIOD, 3, 3).shouldLog()); + + // All should log once the period has elapsed + LogAction foo = helper.record("foo", LOG_PERIOD); + LogAction bar = helper.record("bar", LOG_PERIOD, 2); + LogAction baz = helper.record("baz", LOG_PERIOD, 3, 3); + assertTrue(foo.shouldLog()); + assertTrue(bar.shouldLog()); + assertTrue(baz.shouldLog()); + assertEquals(1, foo.getCount()); + assertEquals(2, bar.getCount()); + assertEquals(2, baz.getCount()); + assertEquals(2.0, bar.getStats(0).getMean(), 0.01); + assertEquals(3.0, baz.getStats(0).getMean(), 0.01); + assertEquals(3.0, baz.getStats(1).getMean(), 0.01); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index 33c61686ee3..c8efdf113e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.log.LogThrottlingHelper; import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Timer; @@ -40,6 +41,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORT import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY; +import static org.apache.hadoop.log.LogThrottlingHelper.LogAction; /** * Mimics a ReentrantReadWriteLock but does not directly implement the interface @@ -74,11 +76,8 @@ class FSNamesystemLock { private final long writeLockReportingThresholdMs; /** Last time stamp for write lock. Keep the longest one for multi-entrance.*/ private long writeLockHeldTimeStampNanos; - private int numWriteLockWarningsSuppressed = 0; - /** Time stamp (ms) of the last time a write lock report was written. */ - private long timeStampOfLastWriteLockReportMs = 0; - /** Longest time (ms) a write lock was held since the last report. */ - private long longestWriteLockHeldIntervalMs = 0; + /** Frequency limiter used for reporting long write lock hold times. */ + private final LogThrottlingHelper writeLockReportLogger; /** Threshold (ms) for long holding read lock report. */ private final long readLockReportingThresholdMs; @@ -132,6 +131,8 @@ class FSNamesystemLock { this.lockSuppressWarningIntervalMs = conf.getTimeDuration( DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + this.writeLockReportLogger = + new LogThrottlingHelper(lockSuppressWarningIntervalMs); this.metricsEnabled = conf.getBoolean( DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, DFS_NAMENODE_LOCK_DETAILED_METRICS_DEFAULT); @@ -230,25 +231,11 @@ class FSNamesystemLock { final long writeLockIntervalMs = TimeUnit.NANOSECONDS.toMillis(writeLockIntervalNanos); - boolean logReport = false; - int numSuppressedWarnings = 0; - long longestLockHeldIntervalMs = 0; + LogAction logAction = LogThrottlingHelper.DO_NOT_LOG; if (needReport && writeLockIntervalMs >= this.writeLockReportingThresholdMs) { - if (writeLockIntervalMs > longestWriteLockHeldIntervalMs) { - longestWriteLockHeldIntervalMs = writeLockIntervalMs; - } - if (currentTimeMs - timeStampOfLastWriteLockReportMs > - this.lockSuppressWarningIntervalMs) { - logReport = true; - numSuppressedWarnings = numWriteLockWarningsSuppressed; - numWriteLockWarningsSuppressed = 0; - longestLockHeldIntervalMs = longestWriteLockHeldIntervalMs; - longestWriteLockHeldIntervalMs = 0; - timeStampOfLastWriteLockReportMs = currentTimeMs; - } else { - numWriteLockWarningsSuppressed++; - } + logAction = writeLockReportLogger + .record("write", currentTimeMs, writeLockIntervalMs); } coarseLock.writeLock().unlock(); @@ -257,13 +244,16 @@ class FSNamesystemLock { addMetric(opName, writeLockIntervalNanos, true); } - if (logReport) { + if (logAction.shouldLog()) { FSNamesystem.LOG.info("FSNamesystem write lock held for " + - writeLockIntervalMs + " ms via\n" + + writeLockIntervalMs + " ms via " + StringUtils.getStackTrace(Thread.currentThread()) + "\tNumber of suppressed write-lock reports: " + - numSuppressedWarnings + "\n\tLongest write-lock held interval: " + - longestLockHeldIntervalMs); + (logAction.getCount() - 1) + + "\n\tLongest write-lock held interval: " + + logAction.getStats(0).getMax() + + "\n\tTotal suppressed write-lock held time: " + + (logAction.getStats(0).getSum() - writeLockIntervalMs)); } }