HDFS-11789. Maintain Short-Circuit Read Statistics. Contributed by Hanisha Koneru.
This commit is contained in:
parent
49aa60e50d
commit
6d116ffad2
|
@ -343,6 +343,10 @@ public interface HdfsClientConfigKeys {
|
|||
int STREAMS_CACHE_SIZE_DEFAULT = 256;
|
||||
String STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms";
|
||||
long STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE;
|
||||
|
||||
String METRICS_SAMPLING_PERCENTAGE_KEY =
|
||||
PREFIX + "metrics.sampling.percentage";
|
||||
int METRICS_SAMPLING_PERCENTAGE_DEFAULT = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,17 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
|
@ -35,15 +34,19 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
|||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DirectBufferPool;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
|
||||
* the same machine as the datanode, then the client can read files directly
|
||||
|
@ -66,6 +69,11 @@ class BlockReaderLocal implements BlockReader {
|
|||
|
||||
private static final DirectBufferPool bufferPool = new DirectBufferPool();
|
||||
|
||||
private static BlockReaderLocalMetrics metrics;
|
||||
private static Lock metricsInitializationLock = new ReentrantLock();
|
||||
private final BlockReaderIoProvider blockReaderIoProvider;
|
||||
private static final Timer TIMER = new Timer();
|
||||
|
||||
public static class Builder {
|
||||
private final int bufferSize;
|
||||
private boolean verifyChecksum;
|
||||
|
@ -76,8 +84,10 @@ class BlockReaderLocal implements BlockReader {
|
|||
private ExtendedBlock block;
|
||||
private StorageType storageType;
|
||||
private Tracer tracer;
|
||||
private ShortCircuitConf shortCircuitConf;
|
||||
|
||||
public Builder(ShortCircuitConf conf) {
|
||||
this.shortCircuitConf = conf;
|
||||
this.maxReadahead = Integer.MAX_VALUE;
|
||||
this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
|
||||
this.bufferSize = conf.getShortCircuitBufferSize();
|
||||
|
@ -269,6 +279,20 @@ class BlockReaderLocal implements BlockReader {
|
|||
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
|
||||
this.storageType = builder.storageType;
|
||||
this.tracer = builder.tracer;
|
||||
|
||||
if (builder.shortCircuitConf.isScrMetricsEnabled()) {
|
||||
metricsInitializationLock.lock();
|
||||
try {
|
||||
if (metrics == null) {
|
||||
metrics = BlockReaderLocalMetrics.create();
|
||||
}
|
||||
} finally {
|
||||
metricsInitializationLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
this.blockReaderIoProvider = new BlockReaderIoProvider(
|
||||
builder.shortCircuitConf, metrics, TIMER);
|
||||
}
|
||||
|
||||
private synchronized void createDataBufIfNeeded() {
|
||||
|
@ -342,7 +366,7 @@ class BlockReaderLocal implements BlockReader {
|
|||
long startDataPos = dataPos;
|
||||
int startBufPos = buf.position();
|
||||
while (buf.hasRemaining()) {
|
||||
int nRead = dataIn.read(buf, dataPos);
|
||||
int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
|
||||
if (nRead < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -435,7 +459,7 @@ class BlockReaderLocal implements BlockReader {
|
|||
freeChecksumBufIfExists();
|
||||
int total = 0;
|
||||
while (buf.hasRemaining()) {
|
||||
int nRead = dataIn.read(buf, dataPos);
|
||||
int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
|
||||
if (nRead <= 0) break;
|
||||
dataPos += nRead;
|
||||
total += nRead;
|
||||
|
@ -574,7 +598,8 @@ class BlockReaderLocal implements BlockReader {
|
|||
int len) throws IOException {
|
||||
freeDataBufIfExists();
|
||||
freeChecksumBufIfExists();
|
||||
int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
|
||||
int nRead = blockReaderIoProvider.read(
|
||||
dataIn, ByteBuffer.wrap(arr, off, len), dataPos);
|
||||
if (nRead > 0) {
|
||||
dataPos += nRead;
|
||||
} else if ((nRead == 0) && (dataPos == dataIn.size())) {
|
||||
|
@ -627,6 +652,9 @@ class BlockReaderLocal implements BlockReader {
|
|||
replica.unref();
|
||||
freeDataBufIfExists();
|
||||
freeChecksumBufIfExists();
|
||||
if (metrics != null) {
|
||||
metrics.collectThreadLocalStates();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -593,6 +593,10 @@ public class DfsClientConf {
|
|||
private final long shortCircuitStreamsCacheExpiryMs;
|
||||
private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
|
||||
|
||||
// Short Circuit Read Metrics
|
||||
private final boolean scrMetricsEnabled;
|
||||
private final int scrMetricsSamplingPercentage;
|
||||
|
||||
private final boolean shortCircuitMmapEnabled;
|
||||
private final int shortCircuitMmapCacheSize;
|
||||
private final long shortCircuitMmapCacheExpiryMs;
|
||||
|
@ -615,6 +619,20 @@ public class DfsClientConf {
|
|||
shortCircuitLocalReads = conf.getBoolean(
|
||||
Read.ShortCircuit.KEY,
|
||||
Read.ShortCircuit.DEFAULT);
|
||||
int scrSamplingPercentage = conf.getInt(
|
||||
Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_KEY,
|
||||
Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_DEFAULT);
|
||||
if (scrSamplingPercentage <= 0) {
|
||||
scrMetricsSamplingPercentage = 0;
|
||||
scrMetricsEnabled = false;
|
||||
} else if (scrSamplingPercentage > 100) {
|
||||
scrMetricsSamplingPercentage = 100;
|
||||
scrMetricsEnabled = true;
|
||||
} else {
|
||||
scrMetricsSamplingPercentage = scrSamplingPercentage;
|
||||
scrMetricsEnabled = true;
|
||||
}
|
||||
|
||||
domainSocketDataTraffic = conf.getBoolean(
|
||||
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
||||
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
||||
|
@ -693,6 +711,14 @@ public class DfsClientConf {
|
|||
return shortCircuitLocalReads;
|
||||
}
|
||||
|
||||
public boolean isScrMetricsEnabled() {
|
||||
return scrMetricsEnabled;
|
||||
}
|
||||
|
||||
public int getScrMetricsSamplingPercentage() {
|
||||
return scrMetricsSamplingPercentage;
|
||||
}
|
||||
|
||||
public boolean isDomainSocketDataTraffic() {
|
||||
return domainSocketDataTraffic;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl.metrics;
|
||||
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* Profiles {@link org.apache.hadoop.hdfs.client.impl.BlockReaderLocal} short
|
||||
* circuit read latencies when ShortCircuit read metrics is enabled through
|
||||
* {@link ShortCircuitConf#scrMetricsEnabled}.
|
||||
*/
|
||||
public class BlockReaderIoProvider {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
BlockReaderIoProvider.class);
|
||||
|
||||
private final BlockReaderLocalMetrics metrics;
|
||||
private final boolean isEnabled;
|
||||
private final int sampleRangeMax;
|
||||
private final Timer timer;
|
||||
|
||||
// Threshold in milliseconds above which a warning should be flagged.
|
||||
private static final long SLOW_READ_WARNING_THRESHOLD_MS = 1000;
|
||||
private boolean isWarningLogged = false;
|
||||
|
||||
public BlockReaderIoProvider(@Nullable ShortCircuitConf conf,
|
||||
BlockReaderLocalMetrics metrics, Timer timer) {
|
||||
if (conf != null) {
|
||||
isEnabled = conf.isScrMetricsEnabled();
|
||||
sampleRangeMax = (Integer.MAX_VALUE / 100) *
|
||||
conf.getScrMetricsSamplingPercentage();
|
||||
this.metrics = metrics;
|
||||
this.timer = timer;
|
||||
} else {
|
||||
this.isEnabled = false;
|
||||
this.sampleRangeMax = 0;
|
||||
this.metrics = null;
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
public int read(FileChannel dataIn, ByteBuffer dst, long position)
|
||||
throws IOException{
|
||||
final int nRead;
|
||||
if (isEnabled && (ThreadLocalRandom.current().nextInt() < sampleRangeMax)) {
|
||||
long begin = timer.monotonicNow();
|
||||
nRead = dataIn.read(dst, position);
|
||||
long latency = timer.monotonicNow() - begin;
|
||||
addLatency(latency);
|
||||
} else {
|
||||
nRead = dataIn.read(dst, position);
|
||||
}
|
||||
return nRead;
|
||||
}
|
||||
|
||||
private void addLatency(long latency) {
|
||||
metrics.addShortCircuitReadLatency(latency);
|
||||
if (latency > SLOW_READ_WARNING_THRESHOLD_MS && !isWarningLogged) {
|
||||
LOG.warn(String.format("The Short Circuit Local Read latency, %d ms, " +
|
||||
"is higher then the threshold (%d ms). Suppressing further warnings" +
|
||||
" for this BlockReaderLocal.",
|
||||
latency, SLOW_READ_WARNING_THRESHOLD_MS));
|
||||
isWarningLogged = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl.metrics;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||
|
||||
/**
|
||||
* This class maintains a metric of rolling average latency for short circuit
|
||||
* reads.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(name="HdfsShortCircuitReads",
|
||||
about="Block Reader Local's Short Circuit Read latency",
|
||||
context="dfs")
|
||||
public class BlockReaderLocalMetrics {
|
||||
|
||||
@Metric(value = "short circuit read operation rate", valueName = "LatencyMs")
|
||||
private MutableRollingAverages shortCircuitReadRollingAverages;
|
||||
|
||||
private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
|
||||
"HdfsShortCircuitReads";
|
||||
private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME =
|
||||
"ShortCircuitLocalReads";
|
||||
|
||||
public static BlockReaderLocalMetrics create() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
BlockReaderLocalMetrics metrics = new BlockReaderLocalMetrics();
|
||||
|
||||
ms.register(
|
||||
SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME, null, metrics);
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds short circuit read elapsed time.
|
||||
*/
|
||||
public void addShortCircuitReadLatency(final long latency) {
|
||||
shortCircuitReadRollingAverages.add(
|
||||
SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME, latency);
|
||||
}
|
||||
|
||||
/**
|
||||
* Collects states maintained in {@link ThreadLocal}, if any.
|
||||
*/
|
||||
public void collectThreadLocalStates() {
|
||||
shortCircuitReadRollingAverages.collectThreadLocalStates();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the MutableRollingAverage metric for testing only.
|
||||
* @return
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public MutableRollingAverages getShortCircuitReadRollingAverages() {
|
||||
return shortCircuitReadRollingAverages;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Support for tracking Block Reader Local's latencies.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
package org.apache.hadoop.hdfs.client.impl.metrics;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
/**
|
||||
* Tests {@link BlockReaderIoProvider}'s profiling of short circuit read
|
||||
* latencies.
|
||||
*/
|
||||
public class TestBlockReaderIoProvider {
|
||||
|
||||
private static final long SLOW_READ_THRESHOLD = 5000;
|
||||
|
||||
private static final FakeTimer TIMER = new FakeTimer();
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
public void testSlowShortCircuitReadsIsRecorded() throws IOException {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
|
||||
.METRICS_SAMPLING_PERCENTAGE_KEY, 100);
|
||||
DfsClientConf clientConf = new DfsClientConf(conf);
|
||||
|
||||
BlockReaderLocalMetrics metrics = Mockito.mock(
|
||||
BlockReaderLocalMetrics.class);
|
||||
|
||||
FileChannel dataIn = Mockito.mock(FileChannel.class);
|
||||
Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
|
||||
new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
TIMER.advance(SLOW_READ_THRESHOLD);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
|
||||
BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
|
||||
clientConf.getShortCircuitConf(), metrics, TIMER);
|
||||
|
||||
blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
|
||||
|
||||
Mockito.verify(metrics, times(1)).addShortCircuitReadLatency(anyLong());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
|
||||
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Tests {@link BlockReaderLocalMetrics}'s statistics.
|
||||
*/
|
||||
public class TestBlockReaderLocalMetrics {
|
||||
private static final long ROLLING_AVERAGES_WINDOW_LENGTH_MS = 1000;
|
||||
private static final int ROLLING_AVERAGE_NUM_WINDOWS = 5;
|
||||
private static final long SLOW_READ_DELAY = 2000;
|
||||
private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
|
||||
"HdfsShortCircuitReads";
|
||||
private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME =
|
||||
"[ShortCircuitLocalReads]RollingAvgLatencyMs";
|
||||
|
||||
private static final FakeTimer TIMER = new FakeTimer();
|
||||
|
||||
private static HdfsConfiguration conf = new HdfsConfiguration();
|
||||
private static DfsClientConf clientConf;
|
||||
|
||||
static {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
|
||||
.METRICS_SAMPLING_PERCENTAGE_KEY, 100);
|
||||
clientConf = new DfsClientConf(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
public void testSlowShortCircuitReadsStatsRecorded() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
|
||||
BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
|
||||
MutableRollingAverages shortCircuitReadRollingAverages = metrics
|
||||
.getShortCircuitReadRollingAverages();
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
shortCircuitReadRollingAverages,
|
||||
ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
FileChannel dataIn = Mockito.mock(FileChannel.class);
|
||||
Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
|
||||
new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
TIMER.advance(SLOW_READ_DELAY);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
|
||||
BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
|
||||
clientConf.getShortCircuitConf(), metrics, TIMER);
|
||||
|
||||
blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
|
||||
blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
metrics.collectThreadLocalStates();
|
||||
return shortCircuitReadRollingAverages.getStats(0).size() > 0;
|
||||
}
|
||||
}, 500, 10000);
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(
|
||||
SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
|
||||
double averageLatency = getDoubleGauge(
|
||||
SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
|
||||
assertTrue("Average Latency of Short Circuit Reads lower than expected",
|
||||
averageLatency >= SLOW_READ_DELAY);
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
public void testMutlipleBlockReaderIoProviderStats() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
|
||||
BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
|
||||
MutableRollingAverages shortCircuitReadRollingAverages = metrics
|
||||
.getShortCircuitReadRollingAverages();
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
shortCircuitReadRollingAverages,
|
||||
ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
FileChannel dataIn1 = Mockito.mock(FileChannel.class);
|
||||
FileChannel dataIn2 = Mockito.mock(FileChannel.class);
|
||||
|
||||
Mockito.when(dataIn1.read(any(ByteBuffer.class), anyLong())).thenAnswer(
|
||||
new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
TIMER.advance(SLOW_READ_DELAY);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
|
||||
Mockito.when(dataIn2.read(any(ByteBuffer.class), anyLong())).thenAnswer(
|
||||
new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
TIMER.advance(SLOW_READ_DELAY*3);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
|
||||
BlockReaderIoProvider blockReaderIoProvider1 = new BlockReaderIoProvider(
|
||||
clientConf.getShortCircuitConf(), metrics, TIMER);
|
||||
BlockReaderIoProvider blockReaderIoProvider2 = new BlockReaderIoProvider(
|
||||
clientConf.getShortCircuitConf(), metrics, TIMER);
|
||||
|
||||
blockReaderIoProvider1.read(dataIn1, any(ByteBuffer.class), anyLong());
|
||||
blockReaderIoProvider2.read(dataIn2, any(ByteBuffer.class), anyLong());
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
metrics.collectThreadLocalStates();
|
||||
return shortCircuitReadRollingAverages.getStats(0).size() > 0;
|
||||
}
|
||||
}, 500, 10000);
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(
|
||||
SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
|
||||
double averageLatency = getDoubleGauge(
|
||||
SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
|
||||
|
||||
assertTrue("Average Latency of Short Circuit Reads lower than expected",
|
||||
averageLatency >= SLOW_READ_DELAY*2);
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
public void testSlowShortCircuitReadsAverageLatencyValue() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
|
||||
BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
|
||||
final MutableRollingAverages shortCircuitReadRollingAverages = metrics
|
||||
.getShortCircuitReadRollingAverages();
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
shortCircuitReadRollingAverages,
|
||||
ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
Random random = new Random();
|
||||
FileChannel[] dataIns = new FileChannel[5];
|
||||
long totalDelay = 0;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
dataIns[i] = Mockito.mock(FileChannel.class);
|
||||
long delay = SLOW_READ_DELAY * random.nextInt(5);
|
||||
Mockito.when(dataIns[i].read(any(ByteBuffer.class), anyLong()))
|
||||
.thenAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
TIMER.advance(delay);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
totalDelay += delay;
|
||||
}
|
||||
long expectedAvgLatency = totalDelay / 5;
|
||||
|
||||
BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
|
||||
clientConf.getShortCircuitConf(), metrics, TIMER);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
blockReaderIoProvider.read(dataIns[i], any(ByteBuffer.class), anyLong());
|
||||
}
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
metrics.collectThreadLocalStates();
|
||||
return shortCircuitReadRollingAverages.getStats(0).size() > 0;
|
||||
}
|
||||
}, 500, 10000);
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(
|
||||
SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
|
||||
double averageLatency = getDoubleGauge(
|
||||
SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
|
||||
|
||||
assertTrue("Average Latency of Short Circuit Reads lower than expected",
|
||||
averageLatency >= expectedAvgLatency);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue