HubSpot Backport HBASE-26304: Reflect out of band locality improvements in metrics and balancer

This commit is contained in:
Bryan Beaudreault 2021-10-22 13:04:45 -04:00
parent b988fd3bb2
commit 102c2bb129
9 changed files with 521 additions and 12 deletions

View File

@ -2033,4 +2033,23 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.enabled</name>
<value>false</value>
<description>
If true, derive StoreFile locality metrics from the underlying DFSInputStream
backing reads for that StoreFile. This value will update as the DFSInputStream's
block locations are updated over time. Otherwise, locality is computed on StoreFile
open, and cached until the StoreFile is closed.
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.cache.period</name>
<value>60000</value>
<description>
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
long should the derived values be cached for. The derivation process may involve
hitting the namenode, if the DFSInputStream's block list is incomplete.
</description>
</property>
</configuration>

View File

@ -126,6 +126,10 @@ public class FileLink {
this.in = tryOpen();
}
private FSDataInputStream getUnderlyingInputStream() {
return in;
}
@Override
public int read() throws IOException {
int res;
@ -475,6 +479,17 @@ public class FileLink {
return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
}
/**
* If the passed FSDataInputStream is backed by a FileLink, returns the underlying
* InputStream for the resolved link target. Otherwise, returns null.
*/
public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
if (stream.getWrappedStream() instanceof FileLinkInputStream) {
return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
}
return null;
}
/**
* NOTE: This method must be used only in the constructor!
* It creates a List with the specified locations for the link.

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -30,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -63,6 +66,7 @@ class RegionLocationFinder {
private static final long CACHE_TIME = 240 * 60 * 1000;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
private Configuration conf;
private static final float EPSILON = 0.0001f;
private volatile ClusterMetrics status;
private MasterServices services;
private final ListeningExecutorService executor;
@ -127,12 +131,68 @@ class RegionLocationFinder {
public void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status;
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
this.status = status;
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
} else {
refreshLocalityChangedRegions(this.status, status);
this.status = status;
}
}
/**
* If locality for a region has changed, that pretty certainly means our cache is out of date.
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
*/
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
if (oldStatus == null || newStatus == null) {
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, newStatus);
return;
}
Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
for (RegionInfo regionInfo : cache.asMap().keySet()) {
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
}
for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
RegionInfo region = regionsByName.get(encodedName);
if (region == null) {
continue;
}
float newLocality = regionEntry.getValue().getDataLocality();
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
if (Math.abs(newLocality - oldLocality) > EPSILON) {
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
region.getEncodedName(), oldLocality, newLocality);
cache.refresh(region);
}
}
}
}
private float getOldLocality(ServerName newServer, byte[] regionName, Map<ServerName, ServerMetrics> oldServers) {
ServerMetrics serverMetrics = oldServers.get(newServer);
if (serverMetrics == null) {
return -1f;
}
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
if (regionMetrics == null) {
return -1f;
}
return regionMetrics.getDataLocality();
}
/**

View File

@ -29,6 +29,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -122,6 +124,7 @@ public class HStoreFile implements StoreFile {
// StoreFile.Reader
private volatile StoreFileReader initialReader;
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
// Block cache configuration and reference.
private final CacheConfig cacheConf;
@ -347,7 +350,11 @@ public class HStoreFile implements StoreFile {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.fileInfo.getHDFSBlockDistribution();
if (initialReaderBlockDistribution != null) {
return initialReaderBlockDistribution.getHDFSBlockDistribution();
} else {
return this.fileInfo.getHDFSBlockDistribution();
}
}
/**
@ -365,6 +372,13 @@ public class HStoreFile implements StoreFile {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
}
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Computes the HDFSBlockDistribution for a file based on the underlying located blocks
* for an HdfsDataInputStream reading that file. This computation may involve a call to
* the namenode, so the value is cached based on
* {@link #HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD}.
*/
@InterfaceAudience.Private
public class InputStreamBlockDistribution {
private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
"hbase.locality.inputstream.derive.enabled";
private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
"hbase.locality.inputstream.derive.cache.period";
private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
private final FSDataInputStream stream;
private final StoreFileInfo fileInfo;
private final int cachePeriodMs;
private HDFSBlocksDistribution hdfsBlocksDistribution;
private long lastCachedAt;
private boolean streamUnsupported;
public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
this.stream = stream;
this.fileInfo = fileInfo;
this.cachePeriodMs = fileInfo.getConf().getInt(
HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
this.lastCachedAt = EnvironmentEdgeManager.currentTime();
this.streamUnsupported = false;
this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
}
/**
* True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
*/
public static boolean isEnabled(Configuration conf) {
return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
}
/**
* Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
* is expired.
*/
public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
try {
LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
computeBlockDistribution();
} catch (IOException e) {
LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
fileInfo, e);
}
}
return hdfsBlocksDistribution;
}
private void computeBlockDistribution() throws IOException {
lastCachedAt = EnvironmentEdgeManager.currentTime();
FSDataInputStream stream;
if (fileInfo.isLink()) {
stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
} else {
stream = this.stream;
}
if (!(stream instanceof HdfsDataInputStream)) {
if (!streamUnsupported) {
LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+ "used to derive locality. Falling back on cached value.",
stream, fileInfo, fileInfo.isLink());
streamUnsupported = true;
}
return;
}
streamUnsupported = false;
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
}
/**
* For tests only, sets lastCachedAt so we can force a refresh
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
synchronized void setLastCachedAt(long timestamp) {
lastCachedAt = timestamp;
}
/**
* For tests only, returns the configured cache period
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
long getCachePeriodMs() {
return cachePeriodMs;
}
/**
* For tests only, returns whether the passed stream is supported
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
boolean isStreamUnsupported() {
return streamUnsupported;
}
}

View File

@ -79,7 +79,10 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable;
@ -705,6 +708,38 @@ public final class FSUtils {
return fs.exists(metaRegionDir);
}
/**
* Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
* are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
* This method retrieves those blocks from the input stream and uses them to calculate
* HDFSBlockDistribution.
*
* The underlying method in DFSInputStream does attempt to use locally cached blocks, but
* may hit the namenode if the cache is determined to be incomplete. The method also involves
* making copies of all LocatedBlocks rather than return the underlying blocks themselves.
*/
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(HdfsDataInputStream inputStream)
throws IOException {
List<LocatedBlock> blocks = inputStream.getAllBlocks();
HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
for (LocatedBlock block : blocks) {
String[] hosts = getHostsForLocations(block);
long len = block.getBlockSize();
StorageType[] storageTypes = block.getStorageTypes();
blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
}
return blocksDistribution;
}
private static String[] getHostsForLocations(LocatedBlock block) {
DatanodeInfo[] locations = block.getLocations();
String[] hosts = new String[locations.length];
for (int i = 0; i < hosts.length; i++) {
hosts[i] = locations[i].getHostName();
}
return hosts;
}
/**
* Compute HDFS blocks distribution of a given file, or a portion of the file
* @param fs file system

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.ClassRule;
import org.junit.Test;
@ -88,6 +89,40 @@ public class TestFileLink {
assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
}
/**
* Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped
* to a {@link HdfsDataInputStream} by
* {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
*/
@Test
public void testGetUnderlyingFSDataInputStream() throws Exception {
HBaseTestingUtility testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
testUtil.startMiniDFSCluster(1);
try {
MiniDFSCluster cluster = testUtil.getDFSCluster();
FileSystem fs = cluster.getFileSystem();
Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
List<Path> files = new ArrayList<Path>();
files.add(originalPath);
FileLink link = new FileLink(files);
FSDataInputStream stream = link.open(fs);
FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
assertTrue(underlying instanceof HdfsDataInputStream);
} finally {
testUtil.shutdownMiniCluster();
}
}
/**
* Test, on HDFS, that the FileLink is still readable
* even when the current file gets renamed.

View File

@ -0,0 +1,165 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class})
public class TestInputStreamBlockDistribution {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
private Configuration conf;
private FileSystem fs;
private Path testPath;
@Before
public void setUp() throws Exception {
HBaseTestingUtility testUtil = new HBaseTestingUtility();
conf = testUtil.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
testUtil.startMiniDFSCluster(1);
MiniDFSCluster cluster = testUtil.getDFSCluster();
fs = cluster.getFileSystem();
testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
writeSomeData(fs, testPath, 256 << 20, (byte)2);
}
@After
public void tearDown() throws Exception {
fs.delete(testPath, false);
fs.close();
}
@Test
public void itDerivesLocalityFromHFileInputStream() throws Exception {
try (FSDataInputStream stream = fs.open(testPath)) {
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test =
new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
assertNotSame(initial, test.getHDFSBlockDistribution());
}
}
@Test
public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
List<Path> files = new ArrayList<Path>();
files.add(testPath);
FileLink link = new FileLink(files);
try (FSDataInputStream stream = link.open(fs)) {
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream,
getMockedStoreFileInfo(initial, true));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
assertNotSame(initial, test.getHDFSBlockDistribution());
}
}
@Test
public void itFallsBackOnLastKnownValueWhenUnsupported() {
FSDataInputStream fakeStream = mock(FSDataInputStream.class);
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
// fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
assertSame(initial, test.getHDFSBlockDistribution());
assertTrue(test.isStreamUnsupported());
}
@Test
public void itFallsBackOnLastKnownValueOnException() throws IOException {
HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
// fakeStream throws an exception, so falls back on original
assertSame(initial, test.getHDFSBlockDistribution());
assertFalse(test.isStreamUnsupported());
}
/**
* Write up to 'size' bytes with value 'v' into a new file called 'path'.
*/
private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
byte[] data = new byte[4096];
for (int i = 0; i < data.length; i++) {
data[i] = v;
}
FSDataOutputStream stream = fs.create(path);
try {
long written = 0;
while (written < size) {
stream.write(data, 0, data.length);
written += data.length;
}
} finally {
stream.close();
}
}
private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
boolean isFileLink) {
StoreFileInfo mock = mock(StoreFileInfo.class);
when(mock.getHDFSBlockDistribution())
.thenReturn(distribution);
when(mock.getConf()).thenReturn(conf);
when(mock.isLink()).thenReturn(isFileLink);
return mock;
}
}

View File

@ -28,6 +28,8 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -52,6 +54,7 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@ -105,7 +108,30 @@ public class TestFSUtils {
out.close();
}
@Test public void testcomputeHDFSBlocksDistribution() throws Exception {
@Test
public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
testComputeHDFSBlocksDistribution((fs, testFile) -> {
try (FSDataInputStream open = fs.open(testFile)) {
assertTrue(open instanceof HdfsDataInputStream);
return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
}
});
}
@Test
public void testComputeHDFSBlockDistribution() throws Exception {
testComputeHDFSBlocksDistribution((fs, testFile) -> {
FileStatus status = fs.getFileStatus(testFile);
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
});
}
@FunctionalInterface
interface HDFSBlockDistributionFunction {
HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
}
private void testComputeHDFSBlocksDistribution(HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024;
conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = null;
@ -129,9 +155,9 @@ public class TestFSUtils {
boolean ok;
do {
ok = true;
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
HDFSBlocksDistribution blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
long uniqueBlocksTotalWeight =
blocksDistribution.getUniqueBlocksTotalWeight();
for (String host : hosts) {
@ -163,9 +189,7 @@ public class TestFSUtils {
long weight;
long uniqueBlocksTotalWeight;
do {
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
HDFSBlocksDistribution blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
String tophost = blocksDistribution.getTopHosts().get(0);
@ -196,8 +220,7 @@ public class TestFSUtils {
final long maxTime = System.currentTimeMillis() + 2000;
HDFSBlocksDistribution blocksDistribution;
do {
FileStatus status = fs.getFileStatus(testFile);
blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
}
while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime);