From 3ab49af40fb9d23bcea9d4dee68a5bde4e1cdca7 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 10 Aug 2011 22:16:30 +0000 Subject: [PATCH] HBASE-4114 Metrics for HFile HDFS block locality git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1156390 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/HDFSBlocksDistribution.java | 234 ++++++++++++++++++ .../apache/hadoop/hbase/master/HMaster.java | 3 + .../hadoop/hbase/master/LoadBalancer.java | 159 ++++++------ .../hadoop/hbase/regionserver/HRegion.java | 56 +++++ .../hbase/regionserver/HRegionServer.java | 10 + .../hadoop/hbase/regionserver/StoreFile.java | 88 ++++++- .../metrics/RegionServerMetrics.java | 9 + .../org/apache/hadoop/hbase/util/FSUtils.java | 28 +++ .../hadoop/hbase/HBaseTestingUtility.java | 75 +++++- .../hbase/regionserver/TestHRegion.java | 66 +++++ .../apache/hadoop/hbase/util/TestFSUtils.java | 102 ++++++++ 12 files changed, 746 insertions(+), 85 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java diff --git a/CHANGES.txt b/CHANGES.txt index 0e8bb6e79ca..db617ac8c87 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -421,6 +421,7 @@ Release 0.91.0 - Unreleased (Nichole Treadway and Nicholas Telford) HBASE-2233 Support both Hadoop 0.20 and 0.22 HBASE-3857 Change the HFile Format (Mikhail & Liyin) + HBASE-4114 Metrics for HFile HDFS block locality (Ming Ma) Release 0.90.5 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java b/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java new file mode 100644 index 00000000000..29c59afb40a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java @@ -0,0 +1,234 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.TreeSet; + + +/** + * Data structure to describe the distribution of HDFS blocks amount hosts + */ +public class HDFSBlocksDistribution { + private Map hostAndWeights = null; + private long uniqueBlocksTotalWeight = 0; + + /** + * Stores the hostname and weight for that hostname. + * + * This is used when determining the physical locations of the blocks making + * up a region. + * + * To make a prioritized list of the hosts holding the most data of a region, + * this class is used to count the total weight for each host. The weight is + * currently just the size of the file. + */ + public static class HostAndWeight { + + private String host; + private long weight; + + /** + * Constructor + * @param host the host name + * @param weight the weight + */ + public HostAndWeight(String host, long weight) { + this.host = host; + this.weight = weight; + } + + /** + * add weight + * @param weight the weight + */ + public void addWeight(long weight) { + this.weight += weight; + } + + /** + * @return the host name + */ + public String getHost() { + return host; + } + + /** + * @return the weight + */ + public long getWeight() { + return weight; + } + + /** + * comparator used to sort hosts based on weight + */ + public static class WeightComparator implements Comparator { + @Override + public int compare(HostAndWeight l, HostAndWeight r) { + if(l.getWeight() == r.getWeight()) { + return l.getHost().compareTo(r.getHost()); + } + return l.getWeight() < r.getWeight() ? -1 : 1; + } + } + } + + /** + * Constructor + */ + public HDFSBlocksDistribution() { + this.hostAndWeights = + new TreeMap(); + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public synchronized String toString() { + return "number of unique hosts in the disribution=" + + this.hostAndWeights.size(); + } + + /** + * add some weight to a list of hosts, update the value of unique block weight + * @param hosts the list of the host + * @param weight the weight + */ + public void addHostsAndBlockWeight(String[] hosts, long weight) { + if (hosts == null || hosts.length == 0) { + throw new NullPointerException("empty hosts"); + } + addUniqueWeight(weight); + for (String hostname : hosts) { + addHostAndBlockWeight(hostname, weight); + } + } + + /** + * add some weight to the total unique weight + * @param weight the weight + */ + private void addUniqueWeight(long weight) { + uniqueBlocksTotalWeight += weight; + } + + + /** + * add some weight to a specific host + * @param host the host name + * @param weight the weight + */ + private void addHostAndBlockWeight(String host, long weight) { + if (host == null) { + throw new NullPointerException("Passed hostname is null"); + } + + HostAndWeight hostAndWeight = this.hostAndWeights.get(host); + if(hostAndWeight == null) { + hostAndWeight = new HostAndWeight(host, weight); + this.hostAndWeights.put(host, hostAndWeight); + } else { + hostAndWeight.addWeight(weight); + } + } + + /** + * @return the hosts and their weights + */ + public Map getHostAndWeights() { + return this.hostAndWeights; + } + + /** + * return the weight for a specific host, that will be the total bytes of all + * blocks on the host + * @param host the host name + * @return the weight of the given host + */ + public long getWeight(String host) { + long weight = 0; + if (host != null) { + HostAndWeight hostAndWeight = this.hostAndWeights.get(host); + if(hostAndWeight != null) { + weight = hostAndWeight.getWeight(); + } + } + return weight; + } + + /** + * @return the sum of all unique blocks' weight + */ + public long getUniqueBlocksTotalWeight() { + return uniqueBlocksTotalWeight; + } + + /** + * return the locality index of a given host + * @param host the host name + * @return the locality index of the given host + */ + public float getBlockLocalityIndex(String host) { + float localityIndex = 0; + HostAndWeight hostAndWeight = this.hostAndWeights.get(host); + if (hostAndWeight != null && uniqueBlocksTotalWeight != 0) { + localityIndex=(float)hostAndWeight.weight/(float)uniqueBlocksTotalWeight; + } + return localityIndex; + } + + + /** + * This will add the distribution from input to this object + * @param otherBlocksDistribution the other hdfs blocks distribution + */ + public void add(HDFSBlocksDistribution otherBlocksDistribution) { + Map otherHostAndWeights = + otherBlocksDistribution.getHostAndWeights(); + for (Map.Entry otherHostAndWeight: + otherHostAndWeights.entrySet()) { + addHostAndBlockWeight(otherHostAndWeight.getValue().host, + otherHostAndWeight.getValue().weight); + } + addUniqueWeight(otherBlocksDistribution.getUniqueBlocksTotalWeight()); + } + + /** + * return the sorted list of hosts in terms of their weights + */ + public List getTopHosts() { + NavigableSet orderedHosts = new TreeSet( + new HostAndWeight.WeightComparator()); + orderedHosts.addAll(this.hostAndWeights.values()); + List topHosts = new ArrayList(orderedHosts.size()); + for(HostAndWeight haw : orderedHosts.descendingSet()) { + topHosts.add(haw.getHost()); + } + return topHosts; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8beeb68d8cb..a00b93de847 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -466,6 +466,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { status.setStatus("Starting assignment manager"); this.assignmentManager.joinCluster(); + this.balancer.setClusterStatus(getClusterStatus()); + this.balancer.setMasterServices(this); + // Start balancer and meta catalog janitor after meta and regions have // been assigned. status.setStatus("Starting balancer and catalog janitor"); diff --git a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index aaca6eb4442..05460b649eb 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -19,29 +19,32 @@ */ package org.apache.hadoop.hbase.master; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.NavigableSet; import java.util.Random; import java.util.TreeMap; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.collect.MinMaxPriorityQueue; @@ -66,11 +69,23 @@ public class LoadBalancer { private static final Random RANDOM = new Random(System.currentTimeMillis()); // slop for regions private float slop; + private Configuration config; + private ClusterStatus status; + private MasterServices services; LoadBalancer(Configuration conf) { this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); if (slop < 0) slop = 0; else if (slop > 1) slop = 1; + this.config = conf; + } + + public void setClusterStatus(ClusterStatus st) { + this.status = st; + } + + public void setMasterServices(MasterServices masterServices) { + this.services = masterServices; } /* @@ -600,106 +615,96 @@ public class LoadBalancer { } /** - * Find the block locations for all of the files for the specified region. - * * Returns an ordered list of hosts that are hosting the blocks for this * region. The weight of each host is the sum of the block lengths of all * files on that host, so the first host in the list is the server which * holds the most bytes of the given region's HFiles. * - * TODO: Make this work. Need to figure out how to match hadoop's hostnames - * given for block locations with our HServerAddress. - * TODO: Use the right directory for the region - * TODO: Use getFileBlockLocations on the files not the directory - * * @param fs the filesystem * @param region region * @return ordered list of hosts holding blocks of the specified region - * @throws IOException if any filesystem errors */ @SuppressWarnings("unused") - private List getTopBlockLocations(FileSystem fs, HRegionInfo region) - throws IOException { - String encodedName = region.getEncodedName(); - Path path = new Path("/hbase/table/" + encodedName); - FileStatus status = fs.getFileStatus(path); - BlockLocation [] blockLocations = - fs.getFileBlockLocations(status, 0, status.getLen()); - Map hostWeights = - new TreeMap(new HostAndWeight.HostComparator()); - for(BlockLocation bl : blockLocations) { - String [] hosts = bl.getHosts(); - long len = bl.getLength(); - for(String host : hosts) { - HostAndWeight haw = hostWeights.get(host); - if(haw == null) { - haw = new HostAndWeight(host, len); - hostWeights.put(haw, haw); - } else { - haw.addWeight(len); - } + private List getTopBlockLocations(FileSystem fs, + HRegionInfo region) { + List topServerNames = null; + try { + HTableDescriptor tableDescriptor = getTableDescriptor( + region.getTableName()); + if (tableDescriptor != null) { + HDFSBlocksDistribution blocksDistribution = + HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, + region.getEncodedName()); + List topHosts = blocksDistribution.getTopHosts(); + topServerNames = mapHostNameToServerName(topHosts); } + } catch (IOException ioe) { + LOG.debug("IOException during HDFSBlocksDistribution computation. for " + + "region = " + region.getEncodedName() , ioe); } - NavigableSet orderedHosts = new TreeSet( - new HostAndWeight.WeightComparator()); - orderedHosts.addAll(hostWeights.values()); - List topHosts = new ArrayList(orderedHosts.size()); - for(HostAndWeight haw : orderedHosts.descendingSet()) { - topHosts.add(haw.getHost()); - } - return topHosts; + + return topServerNames; } /** - * Stores the hostname and weight for that hostname. - * - * This is used when determining the physical locations of the blocks making - * up a region. - * - * To make a prioritized list of the hosts holding the most data of a region, - * this class is used to count the total weight for each host. The weight is - * currently just the size of the file. + * return HTableDescriptor for a given tableName + * @param tableName the table name + * @return HTableDescriptor + * @throws IOException */ - private static class HostAndWeight { - - private final String host; - private long weight; - - public HostAndWeight(String host, long weight) { - this.host = host; - this.weight = weight; + private HTableDescriptor getTableDescriptor(byte[] tableName) + throws IOException { + HTableDescriptor tableDescriptor = null; + try { + if ( this.services != null) + { + tableDescriptor = this.services.getTableDescriptors(). + get(Bytes.toString(tableName)); + } + } catch (TableExistsException tee) { + LOG.debug("TableExistsException during getTableDescriptors." + + " Current table name = " + tableName , tee); + } catch (FileNotFoundException fnfe) { + LOG.debug("FileNotFoundException during getTableDescriptors." + + " Current table name = " + tableName , fnfe); } - public void addWeight(long weight) { - this.weight += weight; + return tableDescriptor; } - public String getHost() { - return host; + /** + * Map hostname to ServerName, The output ServerName list will have the same + * order as input hosts. + * @param hosts the list of hosts + * @return ServerName list + */ + private List mapHostNameToServerName(List hosts) { + if ( hosts == null || status == null) { + return null; } - public long getWeight() { - return weight; - } + List topServerNames = new ArrayList(); + Collection regionServers = status.getServers(); - private static class HostComparator implements Comparator { - @Override - public int compare(HostAndWeight l, HostAndWeight r) { - return l.getHost().compareTo(r.getHost()); - } - } - - private static class WeightComparator implements Comparator { - @Override - public int compare(HostAndWeight l, HostAndWeight r) { - if(l.getWeight() == r.getWeight()) { - return l.getHost().compareTo(r.getHost()); + // create a mapping from hostname to ServerName for fast lookup + HashMap hostToServerName = + new HashMap(); + for (ServerName sn : regionServers) { + hostToServerName.put(sn.getHostname(), sn); } - return l.getWeight() < r.getWeight() ? -1 : 1; + + for (String host : hosts ) { + ServerName sn = hostToServerName.get(host); + // it is possible that HDFS is up ( thus host is valid ), + // but RS is down ( thus sn is null ) + if (sn != null) { + topServerNames.add(sn); } } + return topServerNames; } + /** * Generates an immediate assignment plan to be used by a new master for * regions in transition that do not have an already known destination. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2e745a5feb8..83ff7b2e5d7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -461,6 +462,61 @@ public class HRegion implements HeapSize { // , Writable{ return false; } + /** + * This function will return the HDFS blocks distribution based on the data + * captured when HFile is created + * @return The HDFS blocks distribution for the region. + */ + public HDFSBlocksDistribution getHDFSBlocksDistribution() { + HDFSBlocksDistribution hdfsBlocksDistribution = + new HDFSBlocksDistribution(); + synchronized (this.stores) { + for (Store store : this.stores.values()) { + for (StoreFile sf : store.getStorefiles()) { + HDFSBlocksDistribution storeFileBlocksDistribution = + sf.getHDFSBlockDistribution(); + hdfsBlocksDistribution.add(storeFileBlocksDistribution); + } + } + } + return hdfsBlocksDistribution; + } + + /** + * This is a helper function to compute HDFS block distribution on demand + * @param conf configuration + * @param tableDescriptor HTableDescriptor of the table + * @param regionEncodedName encoded name of the region + * @return The HDFS blocks distribution for the given region. + * @throws IOException + */ + static public HDFSBlocksDistribution computeHDFSBlocksDistribution( + Configuration conf, HTableDescriptor tableDescriptor, + String regionEncodedName) throws IOException { + HDFSBlocksDistribution hdfsBlocksDistribution = + new HDFSBlocksDistribution(); + Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf), + tableDescriptor.getName()); + FileSystem fs = tablePath.getFileSystem(conf); + + for (HColumnDescriptor family: tableDescriptor.getFamilies()) { + Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName, + family.getName()); + if (!fs.exists(storeHomeDir))continue; + + FileStatus[] hfilesStatus = null; + hfilesStatus = fs.listStatus(storeHomeDir); + + for (FileStatus hfileStatus : hfilesStatus) { + HDFSBlocksDistribution storeFileBlocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, + hfileStatus.getLen()); + hdfsBlocksDistribution.add(storeFileBlocksDistribution); + } + } + return hdfsBlocksDistribution; + } + public AtomicLong getMemstoreSize() { return memstoreSize; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 86652c08e05..f22fb6e9a8f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; @@ -1210,6 +1211,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int readRequestsCount = 0; int writeRequestsCount = 0; long storefileIndexSize = 0; + HDFSBlocksDistribution hdfsBlocksDistribution = + new HDFSBlocksDistribution(); long totalStaticIndexSize = 0; long totalStaticBloomSize = 0; for (Map.Entry e : this.onlineRegions.entrySet()) { @@ -1227,6 +1230,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, totalStaticBloomSize += store.getTotalStaticBloomSize(); } } + + hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); } this.metrics.stores.set(stores); this.metrics.storefiles.set(storefiles); @@ -1258,6 +1263,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, percent = (int) (ratio * 100); this.metrics.blockCacheHitCachingRatio.set(percent); } + float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex( + getServerName().getHostname()); + int percent = (int) (localityIndex * 100); + this.metrics.hdfsBlocksLocalityIndex.set(percent); + } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 94c8bb46f8c..7b7bf737cb7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedSet; @@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableUtils; @@ -143,6 +144,9 @@ public class StoreFile { // Is this from an in-memory store private boolean inMemory; + // HDFS blocks distribuion information + private HDFSBlocksDistribution hdfsBlocksDistribution; + // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -384,6 +388,84 @@ public class StoreFile { return blockcache ? getBlockCache(conf) : null; } + + /** + * @return the cached value of HDFS blocks distribution. The cached value is + * calculated when store file is opened. + */ + public HDFSBlocksDistribution getHDFSBlockDistribution() { + return this.hdfsBlocksDistribution; + } + + /** + * helper function to compute HDFS blocks distribution of a given reference + * file.For reference file, we don't compute the exact value. We use some + * estimate instead given it might be good enough. we assume bottom part + * takes the first half of reference file, top part takes the second half + * of the reference file. This is just estimate, given + * midkey ofregion != midkey of HFile, also the number and size of keys vary. + * If this estimate isn't good enough, we can improve it later. + * @param fs The FileSystem + * @param reference The reference + * @param reference The referencePath + * @return HDFS blocks distribution + */ + static private HDFSBlocksDistribution computeRefFileHDFSBlockDistribution( + FileSystem fs, Reference reference, Path referencePath) throws IOException { + if ( referencePath == null) { + return null; + } + + FileStatus status = fs.getFileStatus(referencePath); + long start = 0; + long length = 0; + + if (Reference.isTopFileRegion(reference.getFileRegion())) { + start = status.getLen()/2; + length = status.getLen() - status.getLen()/2; + } else { + start = 0; + length = status.getLen()/2; + } + return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length); + } + + /** + * helper function to compute HDFS blocks distribution of a given file. + * For reference file, it is an estimate + * @param fs The FileSystem + * @param o The path of the file + * @return HDFS blocks distribution + */ + static public HDFSBlocksDistribution computeHDFSBlockDistribution( + FileSystem fs, Path p) throws IOException { + if (isReference(p)) { + Reference reference = Reference.read(fs, p); + Path referencePath = getReferredToFile(p); + return computeRefFileHDFSBlockDistribution(fs, reference, referencePath); + } else { + FileStatus status = fs.getFileStatus(p); + long length = status.getLen(); + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); + } + } + + + /** + * compute HDFS block distribution, for reference file, it is an estimate + */ + private void computeHDFSBlockDistribution() throws IOException { + if (isReference()) { + this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution( + this.fs, this.reference, this.referencePath); + } else { + FileStatus status = this.fs.getFileStatus(this.path); + long length = status.getLen(); + this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution( + this.fs, status, 0, length); + } + } + /** * Opens reader on this store file. Called by Constructor. * @return Reader for the store file. @@ -402,6 +484,9 @@ public class StoreFile { this.inMemory, this.conf.getBoolean(HFile.EVICT_BLOCKS_ON_CLOSE_KEY, true)); } + + computeHDFSBlockDistribution(); + // Load up indices and fileinfo. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); // Read in our metadata. @@ -1235,5 +1320,4 @@ public class StoreFile { } }); } - } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java index 324a0215ad0..88fcc7415b5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java @@ -155,6 +155,12 @@ public class RegionServerMetrics implements Updater { public final MetricsIntValue totalStaticBloomSizeKB = new MetricsIntValue("totalStaticBloomSizeKB", registry); + /** + * HDFS blocks locality index + */ + public final MetricsIntValue hdfsBlocksLocalityIndex = + new MetricsIntValue("hdfsBlocksLocalityIndex", registry); + /** * Sum of all the memstore sizes in this regionserver in MB */ @@ -282,6 +288,7 @@ public class RegionServerMetrics implements Updater { this.blockCacheEvictedCount.pushMetric(this.metricsRecord); this.blockCacheHitRatio.pushMetric(this.metricsRecord); this.blockCacheHitCachingRatio.pushMetric(this.metricsRecord); + this.hdfsBlocksLocalityIndex.pushMetric(this.metricsRecord); // Mix in HFile and HLog metrics // Be careful. Here is code for MTVR from up in hadoop: @@ -407,6 +414,8 @@ public class RegionServerMetrics implements Updater { Long.valueOf(this.blockCacheHitRatio.get())+"%"); sb = Strings.appendKeyValue(sb, this.blockCacheHitCachingRatio.getName(), Long.valueOf(this.blockCacheHitCachingRatio.get())+"%"); + sb = Strings.appendKeyValue(sb, this.hdfsBlocksLocalityIndex.getName(), + Long.valueOf(this.hdfsBlocksLocalityIndex.get())); return sb.toString(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 07294104cf2..1c6f59be768 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -488,6 +490,32 @@ public abstract class FSUtils { return fs.exists(rootRegionDir); } + + /** + * Compute HDFS blocks distribution of a given file, or a portion of the file + * @param fs file system + * @param FileStatus file status of the file + * @param start start position of the portion + * @param length length of the portion + * @return The HDFS blocks distribution + */ + static public HDFSBlocksDistribution computeHDFSBlocksDistribution( + final FileSystem fs, FileStatus status, long start, long length) + throws IOException { + HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); + BlockLocation [] blockLocations = + fs.getFileBlockLocations(status, start, length); + for(BlockLocation bl : blockLocations) { + String [] hosts = bl.getHosts(); + long len = bl.getLength(); + blocksDistribution.addHostsAndBlockWeight(hosts, len); + } + + return blocksDistribution; + } + + + /** * Runs through the hbase rootdir and checks all stores have only * one file in them -- that is, they've been major compacted. Looks diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 88d6ad7e697..735d84f1f37 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -223,7 +223,27 @@ public class HBaseTestingUtility { * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { - return startMiniDFSCluster(servers, null); + return startMiniDFSCluster(servers, null, null); + } + + /** + * Start a minidfscluster. + * This is useful if you want to run datanode on distinct hosts for things + * like HDFS block location verification. + * If you start MiniDFSCluster without host names, all instances of the + * datanodes will have the same host name. + * @param hosts hostnames DNs to run on. + * @throws Exception + * @see {@link #shutdownMiniDFSCluster()} + * @return The mini dfs cluster created. + */ + public MiniDFSCluster startMiniDFSCluster(final String hosts[]) + throws Exception { + if ( hosts != null && hosts.length != 0) { + return startMiniDFSCluster(hosts.length, null, hosts); + } else { + return startMiniDFSCluster(1, null, null); + } } /** @@ -236,6 +256,22 @@ public class HBaseTestingUtility { * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers, final File dir) + throws Exception { + return startMiniDFSCluster(servers, dir, null); + } + + + /** + * Start a minidfscluster. + * Can only create one. + * @param servers How many DNs to start. + * @param dir Where to home your dfs cluster. + * @param hosts hostnames DNs to run on. + * @throws Exception + * @see {@link #shutdownMiniDFSCluster()} + * @return The mini dfs cluster created. + */ + public MiniDFSCluster startMiniDFSCluster(int servers, final File dir, final String hosts[]) throws Exception { // This does the following to home the minidfscluster // base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"); @@ -249,7 +285,7 @@ public class HBaseTestingUtility { System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.toString()); System.setProperty("test.cache.data", this.clusterTestBuildDir.toString()); this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true, - true, null, null, null, null); + true, null, null, hosts, null); // Set this just-started cluser as our filesystem. FileSystem fs = this.dfsCluster.getFileSystem(); this.conf.set("fs.defaultFS", fs.getUri().toString()); @@ -356,6 +392,20 @@ public class HBaseTestingUtility { return startMiniCluster(1, numSlaves); } + + /** + * start minicluster + * @throws Exception + * @see {@link #shutdownMiniCluster()} + * @return Mini hbase cluster instance created. + */ + public MiniHBaseCluster startMiniCluster(final int numMasters, + final int numSlaves) + throws Exception { + return startMiniCluster(numMasters, numSlaves, null); + } + + /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random @@ -365,18 +415,31 @@ public class HBaseTestingUtility { * hbase masters. If numMasters > 1, you can find the active/primary master * with {@link MiniHBaseCluster#getMaster()}. * @param numSlaves Number of slaves to start up. We'll start this many - * datanodes and regionservers. If numSlaves is > 1, then make sure + * regionservers. If dataNodeHosts == null, this also indicates the number of + * datanodes to start. If dataNodeHosts != null, the number of datanodes is + * based on dataNodeHosts.length. + * If numSlaves is > 1, then make sure * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise * bind errors. + * @param dataNodeHosts hostnames DNs to run on. + * This is useful if you want to run datanode on distinct hosts for things + * like HDFS block location verification. + * If you start MiniDFSCluster without host names, + * all instances of the datanodes will have the same host name. * @throws Exception * @see {@link #shutdownMiniCluster()} * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves) + final int numSlaves, final String[] dataNodeHosts) throws Exception { + int numDataNodes = numSlaves; + if ( dataNodeHosts != null && dataNodeHosts.length != 0) { + numDataNodes = dataNodeHosts.length; + } + LOG.info("Starting up minicluster with " + numMasters + " master(s) and " + - numSlaves + " regionserver(s) and datanode(s)"); + numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)"); // If we already put up a cluster, fail. String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null); isRunningCluster(testBuildPath); @@ -390,7 +453,7 @@ public class HBaseTestingUtility { System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. - startMiniDFSCluster(numSlaves, this.clusterTestBuildDir); + startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); // Start up a zk cluster. diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ec8afe9713c..a243011ef67 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -40,17 +43,21 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -69,6 +76,7 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; import com.google.common.collect.Lists; @@ -2962,6 +2970,64 @@ public class TestHRegion extends HBaseTestCase { assertTrue(keyValues.length == 0); } + @Test public void testgetHDFSBlocksDistribution() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + final int DEFAULT_BLOCK_SIZE = 1024; + htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + htu.getConfiguration().setInt("dfs.replication", 2); + + + // set up a cluster with 3 nodes + MiniHBaseCluster cluster; + String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; + int regionServersCount = 3; + + try { + cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); + byte [][] families = {fam1, fam2}; + HTable ht = htu.createTable(Bytes.toBytes(this.getName()), families); + + //Setting up region + byte row[] = Bytes.toBytes("row1"); + byte col[] = Bytes.toBytes("col1"); + + Put put = new Put(row); + put.add(fam1, col, 1, Bytes.toBytes("test1")); + put.add(fam2, col, 1, Bytes.toBytes("test2")); + ht.put(put); + + HRegion firstRegion = htu.getHbaseCluster(). + getRegions(Bytes.toBytes(this.getName())).get(0); + firstRegion.flushcache(); + HDFSBlocksDistribution blocksDistribution1 = + firstRegion.getHDFSBlocksDistribution(); + + // given the default replication factor is 2 and we have 2 HFiles, + // we will have total of 4 replica of blocks on 3 datanodes; thus there + // must be at least one host that have replica for 2 HFiles. That host's + // weight will be equal to the unique block weight. + long uniqueBlocksWeight1 = + blocksDistribution1.getUniqueBlocksTotalWeight(); + + String topHost = blocksDistribution1.getTopHosts().get(0); + long topHostWeight = blocksDistribution1.getWeight(topHost); + assertTrue(uniqueBlocksWeight1 == topHostWeight); + + // use the static method to compute the value, it should be the same. + // static method is used by load balancer or other components + HDFSBlocksDistribution blocksDistribution2 = + HRegion.computeHDFSBlocksDistribution(htu.getConfiguration(), + firstRegion.getTableDesc(), + firstRegion.getRegionInfo().getEncodedName()); + long uniqueBlocksWeight2 = + blocksDistribution2.getUniqueBlocksTotalWeight(); + + assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2); + } finally { + htu.shutdownMiniCluster(); + } + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException { diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index a0df53d1844..1ad30e62b45 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -23,6 +23,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.Test; @@ -30,6 +35,7 @@ import org.junit.Test; * Test {@link FSUtils}. */ public class TestFSUtils { + @Test public void testIsHDFS() throws Exception { HBaseTestingUtility htu = new HBaseTestingUtility(); htu.getConfiguration().setBoolean("dfs.support.append", false); @@ -44,4 +50,100 @@ public class TestFSUtils { if (cluster != null) cluster.shutdown(); } } + + private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) + throws Exception { + FSDataOutputStream out = fs.create(file); + byte [] data = new byte[dataSize]; + out.write(data, 0, dataSize); + out.close(); + } + + @Test public void testcomputeHDFSBlocksDistribution() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + final int DEFAULT_BLOCK_SIZE = 1024; + htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + MiniDFSCluster cluster = null; + Path testFile = null; + + try { + // set up a cluster with 3 nodes + String hosts[] = new String[] { "host1", "host2", "host3" }; + cluster = htu.startMiniDFSCluster(hosts); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + // create a file with two blocks + testFile = new Path("/test1.txt"); + WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE); + + // given the default replication factor is 3, the same as the number of + // datanodes; the locality index for each host should be 100%, + // or getWeight for each host should be the same as getUniqueBlocksWeights + FileStatus status = fs.getFileStatus(testFile); + HDFSBlocksDistribution blocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + long uniqueBlocksTotalWeight = + blocksDistribution.getUniqueBlocksTotalWeight(); + for (String host : hosts) { + long weight = blocksDistribution.getWeight(host); + assertTrue(uniqueBlocksTotalWeight == weight); + } + } finally { + htu.shutdownMiniDFSCluster(); + } + + + try { + // set up a cluster with 4 nodes + String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; + cluster = htu.startMiniDFSCluster(hosts); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + // create a file with three blocks + testFile = new Path("/test2.txt"); + WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE); + + // given the default replication factor is 3, we will have total of 9 + // replica of blocks; thus the host with the highest weight should have + // weight == 3 * DEFAULT_BLOCK_SIZE + FileStatus status = fs.getFileStatus(testFile); + HDFSBlocksDistribution blocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + long uniqueBlocksTotalWeight = + blocksDistribution.getUniqueBlocksTotalWeight(); + + String tophost = blocksDistribution.getTopHosts().get(0); + long weight = blocksDistribution.getWeight(tophost); + assertTrue(uniqueBlocksTotalWeight == weight); + + } finally { + htu.shutdownMiniDFSCluster(); + } + + + try { + // set up a cluster with 4 nodes + String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; + cluster = htu.startMiniDFSCluster(hosts); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + // create a file with one block + testFile = new Path("/test3.txt"); + WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); + + // given the default replication factor is 3, we will have total of 3 + // replica of blocks; thus there is one host without weight + FileStatus status = fs.getFileStatus(testFile); + HDFSBlocksDistribution blocksDistribution = + FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + assertTrue(blocksDistribution.getTopHosts().size() == 3); + } finally { + htu.shutdownMiniDFSCluster(); + } + + } + } \ No newline at end of file