From 102c2bb129dea33d2de928b4888e3e09fd0e04ca Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 22 Oct 2021 13:04:45 -0400 Subject: [PATCH] HubSpot Backport HBASE-26304: Reflect out of band locality improvements in metrics and balancer --- .../src/main/resources/hbase-default.xml | 19 ++ .../org/apache/hadoop/hbase/io/FileLink.java | 15 ++ .../master/balancer/RegionLocationFinder.java | 64 ++++++- .../hadoop/hbase/regionserver/HStoreFile.java | 16 +- .../InputStreamBlockDistribution.java | 143 +++++++++++++++ .../org/apache/hadoop/hbase/util/FSUtils.java | 35 ++++ .../apache/hadoop/hbase/io/TestFileLink.java | 35 ++++ .../TestInputStreamBlockDistribution.java | 165 ++++++++++++++++++ .../apache/hadoop/hbase/util/TestFSUtils.java | 41 ++++- 9 files changed, 521 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index b73274aa14e..31c2de0c73c 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -2033,4 +2033,23 @@ possible configurations would overwhelm and obscure the important. the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size + + hbase.locality.inputstream.derive.enabled + false + + If true, derive StoreFile locality metrics from the underlying DFSInputStream + backing reads for that StoreFile. This value will update as the DFSInputStream's + block locations are updated over time. Otherwise, locality is computed on StoreFile + open, and cached until the StoreFile is closed. + + + + hbase.locality.inputstream.derive.cache.period + 60000 + + If deriving StoreFile locality metrics from the underlying DFSInputStream, how + long should the derived values be cached for. The derivation process may involve + hitting the namenode, if the DFSInputStream's block list is incomplete. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index ba84606fd04..ea285ed53fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -126,6 +126,10 @@ public class FileLink { this.in = tryOpen(); } + private FSDataInputStream getUnderlyingInputStream() { + return in; + } + @Override public int read() throws IOException { int res; @@ -475,6 +479,17 @@ public class FileLink { return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize)); } + /** + * If the passed FSDataInputStream is backed by a FileLink, returns the underlying + * InputStream for the resolved link target. Otherwise, returns null. + */ + public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) { + if (stream.getWrappedStream() instanceof FileLinkInputStream) { + return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream(); + } + return null; + } + /** * NOTE: This method must be used only in the constructor! * It creates a List with the specified locations for the link. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index fb7731fa756..710bbb11d86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -30,6 +31,8 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -63,6 +66,7 @@ class RegionLocationFinder { private static final long CACHE_TIME = 240 * 60 * 1000; private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private Configuration conf; + private static final float EPSILON = 0.0001f; private volatile ClusterMetrics status; private MasterServices services; private final ListeningExecutorService executor; @@ -127,12 +131,68 @@ class RegionLocationFinder { public void setClusterMetrics(ClusterMetrics status) { long currentTime = EnvironmentEdgeManager.currentTime(); - this.status = status; + if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { + this.status = status; // Only count the refresh if it includes user tables ( eg more than meta and namespace ). - lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh; + lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh; + } else { + refreshLocalityChangedRegions(this.status, status); + this.status = status; + } + } + + /** + * If locality for a region has changed, that pretty certainly means our cache is out of date. + * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality. + */ + private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) { + if (oldStatus == null || newStatus == null) { + LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, newStatus); + return; } + Map oldServers = oldStatus.getLiveServerMetrics(); + Map newServers = newStatus.getLiveServerMetrics(); + + Map regionsByName = new HashMap<>(cache.asMap().size()); + for (RegionInfo regionInfo : cache.asMap().keySet()) { + regionsByName.put(regionInfo.getEncodedName(), regionInfo); + } + + for (Map.Entry serverEntry : newServers.entrySet()) { + Map newRegions = serverEntry.getValue().getRegionMetrics(); + for (Map.Entry regionEntry : newRegions.entrySet()) { + String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey()); + RegionInfo region = regionsByName.get(encodedName); + if (region == null) { + continue; + } + + float newLocality = regionEntry.getValue().getDataLocality(); + float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers); + + if (Math.abs(newLocality - oldLocality) > EPSILON) { + LOG.debug("Locality for region {} changed from {} to {}, refreshing cache", + region.getEncodedName(), oldLocality, newLocality); + cache.refresh(region); + } + } + + } + } + + private float getOldLocality(ServerName newServer, byte[] regionName, Map oldServers) { + ServerMetrics serverMetrics = oldServers.get(newServer); + if (serverMetrics == null) { + return -1f; + } + RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName); + if (regionMetrics == null) { + return -1f; + } + + return regionMetrics.getDataLocality(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 9f8a717d21f..039a2c69c6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -29,6 +29,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,6 +124,7 @@ public class HStoreFile implements StoreFile { // StoreFile.Reader private volatile StoreFileReader initialReader; + private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null; // Block cache configuration and reference. private final CacheConfig cacheConf; @@ -347,7 +350,11 @@ public class HStoreFile implements StoreFile { * file is opened. */ public HDFSBlocksDistribution getHDFSBlockDistribution() { - return this.fileInfo.getHDFSBlockDistribution(); + if (initialReaderBlockDistribution != null) { + return initialReaderBlockDistribution.getHDFSBlockDistribution(); + } else { + return this.fileInfo.getHDFSBlockDistribution(); + } } /** @@ -365,6 +372,13 @@ public class HStoreFile implements StoreFile { fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader()); } this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); + + if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) { + boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum(); + FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum); + this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo); + } + // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java new file mode 100644 index 00000000000..7bd33a6f31d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.io.FileLink; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Computes the HDFSBlockDistribution for a file based on the underlying located blocks + * for an HdfsDataInputStream reading that file. This computation may involve a call to + * the namenode, so the value is cached based on + * {@link #HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD}. + */ +@InterfaceAudience.Private +public class InputStreamBlockDistribution { + private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class); + + private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = + "hbase.locality.inputstream.derive.enabled"; + private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false; + + private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = + "hbase.locality.inputstream.derive.cache.period"; + private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000; + + private final FSDataInputStream stream; + private final StoreFileInfo fileInfo; + private final int cachePeriodMs; + + private HDFSBlocksDistribution hdfsBlocksDistribution; + private long lastCachedAt; + private boolean streamUnsupported; + + public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) { + this.stream = stream; + this.fileInfo = fileInfo; + this.cachePeriodMs = fileInfo.getConf().getInt( + HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD, + DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD); + this.lastCachedAt = EnvironmentEdgeManager.currentTime(); + this.streamUnsupported = false; + this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution(); + } + + /** + * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream + */ + public static boolean isEnabled(Configuration conf) { + return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED, + DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED); + } + + /** + * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache + * is expired. + */ + public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() { + if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) { + try { + LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo); + computeBlockDistribution(); + } catch (IOException e) { + LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.", + fileInfo, e); + } + } + return hdfsBlocksDistribution; + } + + private void computeBlockDistribution() throws IOException { + lastCachedAt = EnvironmentEdgeManager.currentTime(); + + FSDataInputStream stream; + if (fileInfo.isLink()) { + stream = FileLink.getUnderlyingFileLinkInputStream(this.stream); + } else { + stream = this.stream; + } + + if (!(stream instanceof HdfsDataInputStream)) { + if (!streamUnsupported) { + LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be " + + "used to derive locality. Falling back on cached value.", + stream, fileInfo, fileInfo.isLink()); + streamUnsupported = true; + } + return; + } + + streamUnsupported = false; + hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream); + } + + /** + * For tests only, sets lastCachedAt so we can force a refresh + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + synchronized void setLastCachedAt(long timestamp) { + lastCachedAt = timestamp; + } + + /** + * For tests only, returns the configured cache period + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + long getCachePeriodMs() { + return cachePeriodMs; + } + + /** + * For tests only, returns whether the passed stream is supported + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + boolean isStreamUnsupported() { + return streamUnsupported; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 91775b9dac2..b1975fae14f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -79,7 +79,10 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Progressable; @@ -705,6 +708,38 @@ public final class FSUtils { return fs.exists(metaRegionDir); } + /** + * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams + * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode. + * This method retrieves those blocks from the input stream and uses them to calculate + * HDFSBlockDistribution. + * + * The underlying method in DFSInputStream does attempt to use locally cached blocks, but + * may hit the namenode if the cache is determined to be incomplete. The method also involves + * making copies of all LocatedBlocks rather than return the underlying blocks themselves. + */ + static public HDFSBlocksDistribution computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) + throws IOException { + List blocks = inputStream.getAllBlocks(); + HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); + for (LocatedBlock block : blocks) { + String[] hosts = getHostsForLocations(block); + long len = block.getBlockSize(); + StorageType[] storageTypes = block.getStorageTypes(); + blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); + } + return blocksDistribution; + } + + private static String[] getHostsForLocations(LocatedBlock block) { + DatanodeInfo[] locations = block.getLocations(); + String[] hosts = new String[locations.length]; + for (int i = 0; i < hosts.length; i++) { + hosts[i] = locations[i].getHostName(); + } + return hosts; + } + /** * Compute HDFS blocks distribution of a given file, or a portion of the file * @param fs file system diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java index 879606807c8..52e56cbf4d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.ipc.RemoteException; import org.junit.ClassRule; import org.junit.Test; @@ -88,6 +89,40 @@ public class TestFileLink { assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering } + /** + * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped + * to a {@link HdfsDataInputStream} by + * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)} + */ + @Test + public void testGetUnderlyingFSDataInputStream() throws Exception { + HBaseTestingUtility testUtil = new HBaseTestingUtility(); + Configuration conf = testUtil.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + testUtil.startMiniDFSCluster(1); + try { + MiniDFSCluster cluster = testUtil.getDFSCluster(); + FileSystem fs = cluster.getFileSystem(); + + Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); + + writeSomeData(fs, originalPath, 256 << 20, (byte) 2); + + List files = new ArrayList(); + files.add(originalPath); + + FileLink link = new FileLink(files); + FSDataInputStream stream = link.open(fs); + + FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream); + assertTrue(underlying instanceof HdfsDataInputStream); + } finally { + testUtil.shutdownMiniCluster(); + } + } + /** * Test, on HDFS, that the FileLink is still readable * even when the current file gets renamed. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java new file mode 100644 index 00000000000..daf6cd42b7f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java @@ -0,0 +1,165 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.io.FileLink; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class}) +public class TestInputStreamBlockDistribution { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class); + + private Configuration conf; + private FileSystem fs; + private Path testPath; + + @Before + public void setUp() throws Exception { + HBaseTestingUtility testUtil = new HBaseTestingUtility(); + conf = testUtil.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + testUtil.startMiniDFSCluster(1); + MiniDFSCluster cluster = testUtil.getDFSCluster(); + fs = cluster.getFileSystem(); + + testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); + + writeSomeData(fs, testPath, 256 << 20, (byte)2); + } + + @After + public void tearDown() throws Exception { + fs.delete(testPath, false); + fs.close(); + } + + @Test + public void itDerivesLocalityFromHFileInputStream() throws Exception { + try (FSDataInputStream stream = fs.open(testPath)) { + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + InputStreamBlockDistribution test = + new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + assertNotSame(initial, test.getHDFSBlockDistribution()); + } + + } + + @Test + public void itDerivesLocalityFromFileLinkInputStream() throws Exception { + List files = new ArrayList(); + files.add(testPath); + + FileLink link = new FileLink(files); + try (FSDataInputStream stream = link.open(fs)) { + + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream, + getMockedStoreFileInfo(initial, true)); + + assertSame(initial, test.getHDFSBlockDistribution()); + + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + assertNotSame(initial, test.getHDFSBlockDistribution()); + } + } + + @Test + public void itFallsBackOnLastKnownValueWhenUnsupported() { + FSDataInputStream fakeStream = mock(FSDataInputStream.class); + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream, + getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve + assertSame(initial, test.getHDFSBlockDistribution()); + assertTrue(test.isStreamUnsupported()); + } + + @Test + public void itFallsBackOnLastKnownValueOnException() throws IOException { + HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class); + when(fakeStream.getAllBlocks()).thenThrow(new IOException("test")); + + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream, + getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + // fakeStream throws an exception, so falls back on original + assertSame(initial, test.getHDFSBlockDistribution()); + + assertFalse(test.isStreamUnsupported()); + } + + /** + * Write up to 'size' bytes with value 'v' into a new file called 'path'. + */ + private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException { + byte[] data = new byte[4096]; + for (int i = 0; i < data.length; i++) { + data[i] = v; + } + + FSDataOutputStream stream = fs.create(path); + try { + long written = 0; + while (written < size) { + stream.write(data, 0, data.length); + written += data.length; + } + } finally { + stream.close(); + } + } + + private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution, + boolean isFileLink) { + StoreFileInfo mock = mock(StoreFileInfo.class); + when(mock.getHDFSBlockDistribution()) + .thenReturn(distribution); + when(mock.getConf()).thenReturn(conf); + when(mock.isLink()).thenReturn(isFileLink); + return mock; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 24ad6efdd54..4a116e27bfc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -28,6 +28,8 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -52,6 +54,7 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -105,7 +108,30 @@ public class TestFSUtils { out.close(); } - @Test public void testcomputeHDFSBlocksDistribution() throws Exception { + @Test + public void testComputeHDFSBlocksDistributionByInputStream() throws Exception { + testComputeHDFSBlocksDistribution((fs, testFile) -> { + try (FSDataInputStream open = fs.open(testFile)) { + assertTrue(open instanceof HdfsDataInputStream); + return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open); + } + }); + } + + @Test + public void testComputeHDFSBlockDistribution() throws Exception { + testComputeHDFSBlocksDistribution((fs, testFile) -> { + FileStatus status = fs.getFileStatus(testFile); + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + }); + } + + @FunctionalInterface + interface HDFSBlockDistributionFunction { + HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException; + } + + private void testComputeHDFSBlocksDistribution(HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception { final int DEFAULT_BLOCK_SIZE = 1024; conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); MiniDFSCluster cluster = null; @@ -129,9 +155,9 @@ public class TestFSUtils { boolean ok; do { ok = true; - FileStatus status = fs.getFileStatus(testFile); - HDFSBlocksDistribution blocksDistribution = - FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + + HDFSBlocksDistribution blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); + long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); for (String host : hosts) { @@ -163,9 +189,7 @@ public class TestFSUtils { long weight; long uniqueBlocksTotalWeight; do { - FileStatus status = fs.getFileStatus(testFile); - HDFSBlocksDistribution blocksDistribution = - FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + HDFSBlocksDistribution blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); String tophost = blocksDistribution.getTopHosts().get(0); @@ -196,8 +220,7 @@ public class TestFSUtils { final long maxTime = System.currentTimeMillis() + 2000; HDFSBlocksDistribution blocksDistribution; do { - FileStatus status = fs.getFileStatus(testFile); - blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 } while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime);