HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bryan Beaudreault 2021-11-26 23:01:46 -05:00 committed by GitHub
parent 33287ac502
commit 1b27124c61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 615 additions and 12 deletions

View File

@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -40,7 +42,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@ -58,6 +59,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
class RegionHDFSBlockLocationFinder extends Configured {
private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
private static final float EPSILON = 0.0001f;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
new HDFSBlocksDistribution();
private volatile ClusterMetrics status;
@ -110,12 +112,70 @@ class RegionHDFSBlockLocationFinder extends Configured {
void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status;
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
this.status = status;
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
} else {
refreshLocalityChangedRegions(this.status, status);
this.status = status;
}
}
/**
* If locality for a region has changed, that pretty certainly means our cache is out of date.
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
*/
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
if (oldStatus == null || newStatus == null) {
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}",
oldStatus, newStatus);
return;
}
Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
for (RegionInfo regionInfo : cache.asMap().keySet()) {
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
}
for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
RegionInfo region = regionsByName.get(encodedName);
if (region == null) {
continue;
}
float newLocality = regionEntry.getValue().getDataLocality();
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
if (Math.abs(newLocality - oldLocality) > EPSILON) {
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
region.getEncodedName(), oldLocality, newLocality);
cache.refresh(region);
}
}
}
}
private float getOldLocality(ServerName newServer, byte[] regionName,
Map<ServerName, ServerMetrics> oldServers) {
ServerMetrics serverMetrics = oldServers.get(newServer);
if (serverMetrics == null) {
return -1f;
}
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
if (regionMetrics == null) {
return -1f;
}
return regionMetrics.getDataLocality();
}
/**
@ -159,7 +219,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
return blocksDistribution;
}
} catch (IOException ioe) {
LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " +
LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
region.getEncodedName(), ioe);
}
@ -263,7 +323,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " +
LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
hregionInfo.getEncodedName(), ee);
}
index++;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -31,12 +32,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -204,4 +207,59 @@ public class TestRegionHDFSBlockLocationFinder {
}
}
}
@Test
public void testRefreshRegionsWithChangedLocality() {
ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
RegionInfo testRegion = REGIONS.get(0);
Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertHostAndWeightEquals(generate(region), hbd);
cache.put(region, hbd);
}
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
0.123f));
// everything should be cached, because metrics were null before
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertSame(cache.get(region), hbd);
}
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
0.345f));
// locality changed just for our test region, so it should no longer be the same
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
if (region.equals(testRegion)) {
assertNotSame(cache.get(region), hbd);
} else {
assertSame(cache.get(region), hbd);
}
}
}
private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region,
float locality) {
RegionMetrics regionMetrics = mock(RegionMetrics.class);
when(regionMetrics.getDataLocality()).thenReturn(locality);
Map<byte[], RegionMetrics> regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
regionMetricsMap.put(region, regionMetrics);
ServerMetrics serverMetrics = mock(ServerMetrics.class);
when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
serverMetricsMap.put(serverName, serverMetrics);
ClusterMetrics metrics = mock(ClusterMetrics.class);
when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
return metrics;
}
}

View File

@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.enabled</name>
<value>false</value>
<description>
If true, derive StoreFile locality metrics from the underlying DFSInputStream
backing reads for that StoreFile. This value will update as the DFSInputStream's
block locations are updated over time. Otherwise, locality is computed on StoreFile
open, and cached until the StoreFile is closed.
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.cache.period</name>
<value>60000</value>
<description>
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
long should the derived values be cached for. The derivation process may involve
hitting the namenode, if the DFSInputStream's block list is incomplete.
</description>
</property>
</configuration>

View File

@ -126,6 +126,10 @@ public class FileLink {
this.in = tryOpen();
}
private FSDataInputStream getUnderlyingInputStream() {
return in;
}
@Override
public int read() throws IOException {
int res;
@ -475,6 +479,17 @@ public class FileLink {
return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
}
/**
* If the passed FSDataInputStream is backed by a FileLink, returns the underlying
* InputStream for the resolved link target. Otherwise, returns null.
*/
public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
if (stream.getWrappedStream() instanceof FileLinkInputStream) {
return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
}
return null;
}
/**
* NOTE: This method must be used only in the constructor!
* It creates a List with the specified locations for the link.

View File

@ -29,6 +29,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -127,6 +128,7 @@ public class HStoreFile implements StoreFile {
// StoreFile.Reader
private volatile StoreFileReader initialReader;
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
// Block cache configuration and reference.
private final CacheConfig cacheConf;
@ -344,7 +346,11 @@ public class HStoreFile implements StoreFile {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.fileInfo.getHDFSBlockDistribution();
if (initialReaderBlockDistribution != null) {
return initialReaderBlockDistribution.getHDFSBlockDistribution();
} else {
return this.fileInfo.getHDFSBlockDistribution();
}
}
/**
@ -362,6 +368,13 @@ public class HStoreFile implements StoreFile {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
}
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Computes the HDFSBlockDistribution for a file based on the underlying located blocks
* for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
* allocating an array of numBlocks size per call. It may also involve calling the namenode, if
* the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
* we cache the computed distribution for a configurable period of time.
* <p>
* This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
* the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
* value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
* <p>
* Once the backing FSDataInputStream is closed, we should not expect the distribution result
* to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
* StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
* anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
* be created for the new initialReader.
*/
@InterfaceAudience.Private
public class InputStreamBlockDistribution {
private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
"hbase.locality.inputstream.derive.enabled";
private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
"hbase.locality.inputstream.derive.cache.period";
private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
private final FSDataInputStream stream;
private final StoreFileInfo fileInfo;
private final int cachePeriodMs;
private HDFSBlocksDistribution hdfsBlocksDistribution;
private long lastCachedAt;
private boolean streamUnsupported;
/**
* This should only be called for the first FSDataInputStream of a StoreFile,
* in {@link HStoreFile#open()}.
*
* @see InputStreamBlockDistribution
* @param stream the input stream to derive locality from
* @param fileInfo the StoreFileInfo for the related store file
*/
public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
this.stream = stream;
this.fileInfo = fileInfo;
this.cachePeriodMs = fileInfo.getConf().getInt(
HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
this.lastCachedAt = EnvironmentEdgeManager.currentTime();
this.streamUnsupported = false;
this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
}
/**
* True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
*/
public static boolean isEnabled(Configuration conf) {
return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
}
/**
* Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
* is expired.
*/
public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
try {
LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
computeBlockDistribution();
} catch (IOException e) {
LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
fileInfo, e);
}
}
return hdfsBlocksDistribution;
}
private void computeBlockDistribution() throws IOException {
lastCachedAt = EnvironmentEdgeManager.currentTime();
FSDataInputStream stream;
if (fileInfo.isLink()) {
stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
} else {
stream = this.stream;
}
if (!(stream instanceof HdfsDataInputStream)) {
if (!streamUnsupported) {
LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+ "used to derive locality. Falling back on cached value.",
stream, fileInfo, fileInfo.isLink());
streamUnsupported = true;
}
return;
}
streamUnsupported = false;
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
}
/**
* For tests only, sets lastCachedAt so we can force a refresh
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
synchronized void setLastCachedAt(long timestamp) {
lastCachedAt = timestamp;
}
/**
* For tests only, returns the configured cache period
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
long getCachePeriodMs() {
return cachePeriodMs;
}
/**
* For tests only, returns whether the passed stream is supported
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
boolean isStreamUnsupported() {
return streamUnsupported;
}
}

View File

@ -81,6 +81,9 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable;
@ -703,6 +706,38 @@ public final class FSUtils {
return fs.exists(metaRegionDir);
}
/**
* Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
* are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
* This method retrieves those blocks from the input stream and uses them to calculate
* HDFSBlockDistribution.
*
* The underlying method in DFSInputStream does attempt to use locally cached blocks, but
* may hit the namenode if the cache is determined to be incomplete. The method also involves
* making copies of all LocatedBlocks rather than return the underlying blocks themselves.
*/
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
HdfsDataInputStream inputStream) throws IOException {
List<LocatedBlock> blocks = inputStream.getAllBlocks();
HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
for (LocatedBlock block : blocks) {
String[] hosts = getHostsForLocations(block);
long len = block.getBlockSize();
StorageType[] storageTypes = block.getStorageTypes();
blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
}
return blocksDistribution;
}
private static String[] getHostsForLocations(LocatedBlock block) {
DatanodeInfo[] locations = block.getLocations();
String[] hosts = new String[locations.length];
for (int i = 0; i < hosts.length; i++) {
hosts[i] = locations[i].getHostName();
}
return hosts;
}
/**
* Compute HDFS blocks distribution of a given file, or a portion of the file
* @param fs file system

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.ClassRule;
import org.junit.Test;
@ -88,6 +89,40 @@ public class TestFileLink {
assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
}
/**
* Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped
* to a {@link HdfsDataInputStream} by
* {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
*/
@Test
public void testGetUnderlyingFSDataInputStream() throws Exception {
HBaseTestingUtil testUtil = new HBaseTestingUtil();
Configuration conf = testUtil.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
testUtil.startMiniDFSCluster(1);
try {
MiniDFSCluster cluster = testUtil.getDFSCluster();
FileSystem fs = cluster.getFileSystem();
Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
List<Path> files = new ArrayList<Path>();
files.add(originalPath);
FileLink link = new FileLink(files);
FSDataInputStream stream = link.open(fs);
FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
assertTrue(underlying instanceof HdfsDataInputStream);
} finally {
testUtil.shutdownMiniCluster();
}
}
/**
* Test, on HDFS, that the FileLink is still readable
* even when the current file gets renamed.

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class})
public class TestInputStreamBlockDistribution {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
private Configuration conf;
private FileSystem fs;
private Path testPath;
@Before
public void setUp() throws Exception {
HBaseTestingUtil testUtil = new HBaseTestingUtil();
conf = testUtil.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
testUtil.startMiniDFSCluster(1);
MiniDFSCluster cluster = testUtil.getDFSCluster();
fs = cluster.getFileSystem();
testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
writeSomeData(fs, testPath, 256 << 20, (byte)2);
}
@After
public void tearDown() throws Exception {
fs.delete(testPath, false);
fs.close();
}
@Test
public void itDerivesLocalityFromHFileInputStream() throws Exception {
try (FSDataInputStream stream = fs.open(testPath)) {
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test =
new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
assertNotSame(initial, test.getHDFSBlockDistribution());
}
}
@Test
public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
List<Path> files = new ArrayList<Path>();
files.add(testPath);
FileLink link = new FileLink(files);
try (FSDataInputStream stream = link.open(fs)) {
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream,
getMockedStoreFileInfo(initial, true));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
assertNotSame(initial, test.getHDFSBlockDistribution());
}
}
@Test
public void itFallsBackOnLastKnownValueWhenUnsupported() {
FSDataInputStream fakeStream = mock(FSDataInputStream.class);
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
// fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
assertSame(initial, test.getHDFSBlockDistribution());
assertTrue(test.isStreamUnsupported());
}
@Test
public void itFallsBackOnLastKnownValueOnException() throws IOException {
HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
getMockedStoreFileInfo(initial, false));
assertSame(initial, test.getHDFSBlockDistribution());
test.setLastCachedAt(test.getCachePeriodMs() + 1);
// fakeStream throws an exception, so falls back on original
assertSame(initial, test.getHDFSBlockDistribution());
assertFalse(test.isStreamUnsupported());
}
/**
* Write up to 'size' bytes with value 'v' into a new file called 'path'.
*/
private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
byte[] data = new byte[4096];
for (int i = 0; i < data.length; i++) {
data[i] = v;
}
FSDataOutputStream stream = fs.create(path);
try {
long written = 0;
while (written < size) {
stream.write(data, 0, data.length);
written += data.length;
}
} finally {
stream.close();
}
}
private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
boolean isFileLink) {
StoreFileInfo mock = mock(StoreFileInfo.class);
when(mock.getHDFSBlockDistribution())
.thenReturn(distribution);
when(mock.getConf()).thenReturn(conf);
when(mock.isLink()).thenReturn(isFileLink);
return mock;
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@ -105,7 +106,31 @@ public class TestFSUtils {
out.close();
}
@Test public void testcomputeHDFSBlocksDistribution() throws Exception {
@Test
public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
testComputeHDFSBlocksDistribution((fs, testFile) -> {
try (FSDataInputStream open = fs.open(testFile)) {
assertTrue(open instanceof HdfsDataInputStream);
return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
}
});
}
@Test
public void testComputeHDFSBlockDistribution() throws Exception {
testComputeHDFSBlocksDistribution((fs, testFile) -> {
FileStatus status = fs.getFileStatus(testFile);
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
});
}
@FunctionalInterface
interface HDFSBlockDistributionFunction {
HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
}
private void testComputeHDFSBlocksDistribution(
HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024;
conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = null;
@ -129,9 +154,10 @@ public class TestFSUtils {
boolean ok;
do {
ok = true;
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
fileToBlockDistribution.getForPath(fs, testFile);
long uniqueBlocksTotalWeight =
blocksDistribution.getUniqueBlocksTotalWeight();
for (String host : hosts) {
@ -163,9 +189,8 @@ public class TestFSUtils {
long weight;
long uniqueBlocksTotalWeight;
do {
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
fileToBlockDistribution.getForPath(fs, testFile);
uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
String tophost = blocksDistribution.getTopHosts().get(0);
@ -197,8 +222,7 @@ public class TestFSUtils {
final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
HDFSBlocksDistribution blocksDistribution;
do {
FileStatus status = fs.getFileStatus(testFile);
blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
}
while (blocksDistribution.getTopHosts().size() != 3 &&