HBASE-4114 Metrics for HFile HDFS block locality
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1156390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03019ced6a
commit
3ab49af40f
|
@ -421,6 +421,7 @@ Release 0.91.0 - Unreleased
|
|||
(Nichole Treadway and Nicholas Telford)
|
||||
HBASE-2233 Support both Hadoop 0.20 and 0.22
|
||||
HBASE-3857 Change the HFile Format (Mikhail & Liyin)
|
||||
HBASE-4114 Metrics for HFile HDFS block locality (Ming Ma)
|
||||
|
||||
Release 0.90.5 - Unreleased
|
||||
|
||||
|
|
|
@ -0,0 +1,234 @@
|
|||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
|
||||
/**
|
||||
* Data structure to describe the distribution of HDFS blocks amount hosts
|
||||
*/
|
||||
public class HDFSBlocksDistribution {
|
||||
private Map<String,HostAndWeight> hostAndWeights = null;
|
||||
private long uniqueBlocksTotalWeight = 0;
|
||||
|
||||
/**
|
||||
* Stores the hostname and weight for that hostname.
|
||||
*
|
||||
* This is used when determining the physical locations of the blocks making
|
||||
* up a region.
|
||||
*
|
||||
* To make a prioritized list of the hosts holding the most data of a region,
|
||||
* this class is used to count the total weight for each host. The weight is
|
||||
* currently just the size of the file.
|
||||
*/
|
||||
public static class HostAndWeight {
|
||||
|
||||
private String host;
|
||||
private long weight;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param host the host name
|
||||
* @param weight the weight
|
||||
*/
|
||||
public HostAndWeight(String host, long weight) {
|
||||
this.host = host;
|
||||
this.weight = weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* add weight
|
||||
* @param weight the weight
|
||||
*/
|
||||
public void addWeight(long weight) {
|
||||
this.weight += weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the host name
|
||||
*/
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the weight
|
||||
*/
|
||||
public long getWeight() {
|
||||
return weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* comparator used to sort hosts based on weight
|
||||
*/
|
||||
public static class WeightComparator implements Comparator<HostAndWeight> {
|
||||
@Override
|
||||
public int compare(HostAndWeight l, HostAndWeight r) {
|
||||
if(l.getWeight() == r.getWeight()) {
|
||||
return l.getHost().compareTo(r.getHost());
|
||||
}
|
||||
return l.getWeight() < r.getWeight() ? -1 : 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public HDFSBlocksDistribution() {
|
||||
this.hostAndWeights =
|
||||
new TreeMap<String,HostAndWeight>();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#toString()
|
||||
*/
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return "number of unique hosts in the disribution=" +
|
||||
this.hostAndWeights.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* add some weight to a list of hosts, update the value of unique block weight
|
||||
* @param hosts the list of the host
|
||||
* @param weight the weight
|
||||
*/
|
||||
public void addHostsAndBlockWeight(String[] hosts, long weight) {
|
||||
if (hosts == null || hosts.length == 0) {
|
||||
throw new NullPointerException("empty hosts");
|
||||
}
|
||||
addUniqueWeight(weight);
|
||||
for (String hostname : hosts) {
|
||||
addHostAndBlockWeight(hostname, weight);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* add some weight to the total unique weight
|
||||
* @param weight the weight
|
||||
*/
|
||||
private void addUniqueWeight(long weight) {
|
||||
uniqueBlocksTotalWeight += weight;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* add some weight to a specific host
|
||||
* @param host the host name
|
||||
* @param weight the weight
|
||||
*/
|
||||
private void addHostAndBlockWeight(String host, long weight) {
|
||||
if (host == null) {
|
||||
throw new NullPointerException("Passed hostname is null");
|
||||
}
|
||||
|
||||
HostAndWeight hostAndWeight = this.hostAndWeights.get(host);
|
||||
if(hostAndWeight == null) {
|
||||
hostAndWeight = new HostAndWeight(host, weight);
|
||||
this.hostAndWeights.put(host, hostAndWeight);
|
||||
} else {
|
||||
hostAndWeight.addWeight(weight);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the hosts and their weights
|
||||
*/
|
||||
public Map<String,HostAndWeight> getHostAndWeights() {
|
||||
return this.hostAndWeights;
|
||||
}
|
||||
|
||||
/**
|
||||
* return the weight for a specific host, that will be the total bytes of all
|
||||
* blocks on the host
|
||||
* @param host the host name
|
||||
* @return the weight of the given host
|
||||
*/
|
||||
public long getWeight(String host) {
|
||||
long weight = 0;
|
||||
if (host != null) {
|
||||
HostAndWeight hostAndWeight = this.hostAndWeights.get(host);
|
||||
if(hostAndWeight != null) {
|
||||
weight = hostAndWeight.getWeight();
|
||||
}
|
||||
}
|
||||
return weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the sum of all unique blocks' weight
|
||||
*/
|
||||
public long getUniqueBlocksTotalWeight() {
|
||||
return uniqueBlocksTotalWeight;
|
||||
}
|
||||
|
||||
/**
|
||||
* return the locality index of a given host
|
||||
* @param host the host name
|
||||
* @return the locality index of the given host
|
||||
*/
|
||||
public float getBlockLocalityIndex(String host) {
|
||||
float localityIndex = 0;
|
||||
HostAndWeight hostAndWeight = this.hostAndWeights.get(host);
|
||||
if (hostAndWeight != null && uniqueBlocksTotalWeight != 0) {
|
||||
localityIndex=(float)hostAndWeight.weight/(float)uniqueBlocksTotalWeight;
|
||||
}
|
||||
return localityIndex;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This will add the distribution from input to this object
|
||||
* @param otherBlocksDistribution the other hdfs blocks distribution
|
||||
*/
|
||||
public void add(HDFSBlocksDistribution otherBlocksDistribution) {
|
||||
Map<String,HostAndWeight> otherHostAndWeights =
|
||||
otherBlocksDistribution.getHostAndWeights();
|
||||
for (Map.Entry<String, HostAndWeight> otherHostAndWeight:
|
||||
otherHostAndWeights.entrySet()) {
|
||||
addHostAndBlockWeight(otherHostAndWeight.getValue().host,
|
||||
otherHostAndWeight.getValue().weight);
|
||||
}
|
||||
addUniqueWeight(otherBlocksDistribution.getUniqueBlocksTotalWeight());
|
||||
}
|
||||
|
||||
/**
|
||||
* return the sorted list of hosts in terms of their weights
|
||||
*/
|
||||
public List<String> getTopHosts() {
|
||||
NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
|
||||
new HostAndWeight.WeightComparator());
|
||||
orderedHosts.addAll(this.hostAndWeights.values());
|
||||
List<String> topHosts = new ArrayList<String>(orderedHosts.size());
|
||||
for(HostAndWeight haw : orderedHosts.descendingSet()) {
|
||||
topHosts.add(haw.getHost());
|
||||
}
|
||||
return topHosts;
|
||||
}
|
||||
|
||||
}
|
|
@ -466,6 +466,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
status.setStatus("Starting assignment manager");
|
||||
this.assignmentManager.joinCluster();
|
||||
|
||||
this.balancer.setClusterStatus(getClusterStatus());
|
||||
this.balancer.setMasterServices(this);
|
||||
|
||||
// Start balancer and meta catalog janitor after meta and regions have
|
||||
// been assigned.
|
||||
status.setStatus("Starting balancer and catalog janitor");
|
||||
|
|
|
@ -19,29 +19,32 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
|
||||
|
@ -66,11 +69,23 @@ public class LoadBalancer {
|
|||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
// slop for regions
|
||||
private float slop;
|
||||
private Configuration config;
|
||||
private ClusterStatus status;
|
||||
private MasterServices services;
|
||||
|
||||
LoadBalancer(Configuration conf) {
|
||||
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
|
||||
if (slop < 0) slop = 0;
|
||||
else if (slop > 1) slop = 1;
|
||||
this.config = conf;
|
||||
}
|
||||
|
||||
public void setClusterStatus(ClusterStatus st) {
|
||||
this.status = st;
|
||||
}
|
||||
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
this.services = masterServices;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -600,106 +615,96 @@ public class LoadBalancer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Find the block locations for all of the files for the specified region.
|
||||
*
|
||||
* Returns an ordered list of hosts that are hosting the blocks for this
|
||||
* region. The weight of each host is the sum of the block lengths of all
|
||||
* files on that host, so the first host in the list is the server which
|
||||
* holds the most bytes of the given region's HFiles.
|
||||
*
|
||||
* TODO: Make this work. Need to figure out how to match hadoop's hostnames
|
||||
* given for block locations with our HServerAddress.
|
||||
* TODO: Use the right directory for the region
|
||||
* TODO: Use getFileBlockLocations on the files not the directory
|
||||
*
|
||||
* @param fs the filesystem
|
||||
* @param region region
|
||||
* @return ordered list of hosts holding blocks of the specified region
|
||||
* @throws IOException if any filesystem errors
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
private List<String> getTopBlockLocations(FileSystem fs, HRegionInfo region)
|
||||
throws IOException {
|
||||
String encodedName = region.getEncodedName();
|
||||
Path path = new Path("/hbase/table/" + encodedName);
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
BlockLocation [] blockLocations =
|
||||
fs.getFileBlockLocations(status, 0, status.getLen());
|
||||
Map<HostAndWeight,HostAndWeight> hostWeights =
|
||||
new TreeMap<HostAndWeight,HostAndWeight>(new HostAndWeight.HostComparator());
|
||||
for(BlockLocation bl : blockLocations) {
|
||||
String [] hosts = bl.getHosts();
|
||||
long len = bl.getLength();
|
||||
for(String host : hosts) {
|
||||
HostAndWeight haw = hostWeights.get(host);
|
||||
if(haw == null) {
|
||||
haw = new HostAndWeight(host, len);
|
||||
hostWeights.put(haw, haw);
|
||||
} else {
|
||||
haw.addWeight(len);
|
||||
}
|
||||
private List<ServerName> getTopBlockLocations(FileSystem fs,
|
||||
HRegionInfo region) {
|
||||
List<ServerName> topServerNames = null;
|
||||
try {
|
||||
HTableDescriptor tableDescriptor = getTableDescriptor(
|
||||
region.getTableName());
|
||||
if (tableDescriptor != null) {
|
||||
HDFSBlocksDistribution blocksDistribution =
|
||||
HRegion.computeHDFSBlocksDistribution(config, tableDescriptor,
|
||||
region.getEncodedName());
|
||||
List<String> topHosts = blocksDistribution.getTopHosts();
|
||||
topServerNames = mapHostNameToServerName(topHosts);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.debug("IOException during HDFSBlocksDistribution computation. for " +
|
||||
"region = " + region.getEncodedName() , ioe);
|
||||
}
|
||||
NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
|
||||
new HostAndWeight.WeightComparator());
|
||||
orderedHosts.addAll(hostWeights.values());
|
||||
List<String> topHosts = new ArrayList<String>(orderedHosts.size());
|
||||
for(HostAndWeight haw : orderedHosts.descendingSet()) {
|
||||
topHosts.add(haw.getHost());
|
||||
}
|
||||
return topHosts;
|
||||
|
||||
return topServerNames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the hostname and weight for that hostname.
|
||||
*
|
||||
* This is used when determining the physical locations of the blocks making
|
||||
* up a region.
|
||||
*
|
||||
* To make a prioritized list of the hosts holding the most data of a region,
|
||||
* this class is used to count the total weight for each host. The weight is
|
||||
* currently just the size of the file.
|
||||
* return HTableDescriptor for a given tableName
|
||||
* @param tableName the table name
|
||||
* @return HTableDescriptor
|
||||
* @throws IOException
|
||||
*/
|
||||
private static class HostAndWeight {
|
||||
|
||||
private final String host;
|
||||
private long weight;
|
||||
|
||||
public HostAndWeight(String host, long weight) {
|
||||
this.host = host;
|
||||
this.weight = weight;
|
||||
private HTableDescriptor getTableDescriptor(byte[] tableName)
|
||||
throws IOException {
|
||||
HTableDescriptor tableDescriptor = null;
|
||||
try {
|
||||
if ( this.services != null)
|
||||
{
|
||||
tableDescriptor = this.services.getTableDescriptors().
|
||||
get(Bytes.toString(tableName));
|
||||
}
|
||||
} catch (TableExistsException tee) {
|
||||
LOG.debug("TableExistsException during getTableDescriptors." +
|
||||
" Current table name = " + tableName , tee);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
LOG.debug("FileNotFoundException during getTableDescriptors." +
|
||||
" Current table name = " + tableName , fnfe);
|
||||
}
|
||||
|
||||
public void addWeight(long weight) {
|
||||
this.weight += weight;
|
||||
return tableDescriptor;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
/**
|
||||
* Map hostname to ServerName, The output ServerName list will have the same
|
||||
* order as input hosts.
|
||||
* @param hosts the list of hosts
|
||||
* @return ServerName list
|
||||
*/
|
||||
private List<ServerName> mapHostNameToServerName(List<String> hosts) {
|
||||
if ( hosts == null || status == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public long getWeight() {
|
||||
return weight;
|
||||
}
|
||||
List<ServerName> topServerNames = new ArrayList<ServerName>();
|
||||
Collection<ServerName> regionServers = status.getServers();
|
||||
|
||||
private static class HostComparator implements Comparator<HostAndWeight> {
|
||||
@Override
|
||||
public int compare(HostAndWeight l, HostAndWeight r) {
|
||||
return l.getHost().compareTo(r.getHost());
|
||||
}
|
||||
}
|
||||
|
||||
private static class WeightComparator implements Comparator<HostAndWeight> {
|
||||
@Override
|
||||
public int compare(HostAndWeight l, HostAndWeight r) {
|
||||
if(l.getWeight() == r.getWeight()) {
|
||||
return l.getHost().compareTo(r.getHost());
|
||||
// create a mapping from hostname to ServerName for fast lookup
|
||||
HashMap<String, ServerName> hostToServerName =
|
||||
new HashMap<String, ServerName>();
|
||||
for (ServerName sn : regionServers) {
|
||||
hostToServerName.put(sn.getHostname(), sn);
|
||||
}
|
||||
return l.getWeight() < r.getWeight() ? -1 : 1;
|
||||
|
||||
for (String host : hosts ) {
|
||||
ServerName sn = hostToServerName.get(host);
|
||||
// it is possible that HDFS is up ( thus host is valid ),
|
||||
// but RS is down ( thus sn is null )
|
||||
if (sn != null) {
|
||||
topServerNames.add(sn);
|
||||
}
|
||||
}
|
||||
return topServerNames;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generates an immediate assignment plan to be used by a new master for
|
||||
* regions in transition that do not have an already known destination.
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -461,6 +462,61 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will return the HDFS blocks distribution based on the data
|
||||
* captured when HFile is created
|
||||
* @return The HDFS blocks distribution for the region.
|
||||
*/
|
||||
public HDFSBlocksDistribution getHDFSBlocksDistribution() {
|
||||
HDFSBlocksDistribution hdfsBlocksDistribution =
|
||||
new HDFSBlocksDistribution();
|
||||
synchronized (this.stores) {
|
||||
for (Store store : this.stores.values()) {
|
||||
for (StoreFile sf : store.getStorefiles()) {
|
||||
HDFSBlocksDistribution storeFileBlocksDistribution =
|
||||
sf.getHDFSBlockDistribution();
|
||||
hdfsBlocksDistribution.add(storeFileBlocksDistribution);
|
||||
}
|
||||
}
|
||||
}
|
||||
return hdfsBlocksDistribution;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a helper function to compute HDFS block distribution on demand
|
||||
* @param conf configuration
|
||||
* @param tableDescriptor HTableDescriptor of the table
|
||||
* @param regionEncodedName encoded name of the region
|
||||
* @return The HDFS blocks distribution for the given region.
|
||||
* @throws IOException
|
||||
*/
|
||||
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
|
||||
Configuration conf, HTableDescriptor tableDescriptor,
|
||||
String regionEncodedName) throws IOException {
|
||||
HDFSBlocksDistribution hdfsBlocksDistribution =
|
||||
new HDFSBlocksDistribution();
|
||||
Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
|
||||
tableDescriptor.getName());
|
||||
FileSystem fs = tablePath.getFileSystem(conf);
|
||||
|
||||
for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
|
||||
Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName,
|
||||
family.getName());
|
||||
if (!fs.exists(storeHomeDir))continue;
|
||||
|
||||
FileStatus[] hfilesStatus = null;
|
||||
hfilesStatus = fs.listStatus(storeHomeDir);
|
||||
|
||||
for (FileStatus hfileStatus : hfilesStatus) {
|
||||
HDFSBlocksDistribution storeFileBlocksDistribution =
|
||||
FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0,
|
||||
hfileStatus.getLen());
|
||||
hdfsBlocksDistribution.add(storeFileBlocksDistribution);
|
||||
}
|
||||
}
|
||||
return hdfsBlocksDistribution;
|
||||
}
|
||||
|
||||
public AtomicLong getMemstoreSize() {
|
||||
return memstoreSize;
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
|
@ -1210,6 +1211,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
int readRequestsCount = 0;
|
||||
int writeRequestsCount = 0;
|
||||
long storefileIndexSize = 0;
|
||||
HDFSBlocksDistribution hdfsBlocksDistribution =
|
||||
new HDFSBlocksDistribution();
|
||||
long totalStaticIndexSize = 0;
|
||||
long totalStaticBloomSize = 0;
|
||||
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
|
||||
|
@ -1227,6 +1230,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
totalStaticBloomSize += store.getTotalStaticBloomSize();
|
||||
}
|
||||
}
|
||||
|
||||
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
|
||||
}
|
||||
this.metrics.stores.set(stores);
|
||||
this.metrics.storefiles.set(storefiles);
|
||||
|
@ -1258,6 +1263,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
percent = (int) (ratio * 100);
|
||||
this.metrics.blockCacheHitCachingRatio.set(percent);
|
||||
}
|
||||
float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
|
||||
getServerName().getHostname());
|
||||
int percent = (int) (localityIndex * 100);
|
||||
this.metrics.hdfsBlocksLocalityIndex.set(percent);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.SortedSet;
|
||||
|
@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
@ -143,6 +144,9 @@ public class StoreFile {
|
|||
// Is this from an in-memory store
|
||||
private boolean inMemory;
|
||||
|
||||
// HDFS blocks distribuion information
|
||||
private HDFSBlocksDistribution hdfsBlocksDistribution;
|
||||
|
||||
// Keys for metadata stored in backing HFile.
|
||||
// Set when we obtain a Reader.
|
||||
private long sequenceid = -1;
|
||||
|
@ -384,6 +388,84 @@ public class StoreFile {
|
|||
return blockcache ? getBlockCache(conf) : null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the cached value of HDFS blocks distribution. The cached value is
|
||||
* calculated when store file is opened.
|
||||
*/
|
||||
public HDFSBlocksDistribution getHDFSBlockDistribution() {
|
||||
return this.hdfsBlocksDistribution;
|
||||
}
|
||||
|
||||
/**
|
||||
* helper function to compute HDFS blocks distribution of a given reference
|
||||
* file.For reference file, we don't compute the exact value. We use some
|
||||
* estimate instead given it might be good enough. we assume bottom part
|
||||
* takes the first half of reference file, top part takes the second half
|
||||
* of the reference file. This is just estimate, given
|
||||
* midkey ofregion != midkey of HFile, also the number and size of keys vary.
|
||||
* If this estimate isn't good enough, we can improve it later.
|
||||
* @param fs The FileSystem
|
||||
* @param reference The reference
|
||||
* @param reference The referencePath
|
||||
* @return HDFS blocks distribution
|
||||
*/
|
||||
static private HDFSBlocksDistribution computeRefFileHDFSBlockDistribution(
|
||||
FileSystem fs, Reference reference, Path referencePath) throws IOException {
|
||||
if ( referencePath == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
FileStatus status = fs.getFileStatus(referencePath);
|
||||
long start = 0;
|
||||
long length = 0;
|
||||
|
||||
if (Reference.isTopFileRegion(reference.getFileRegion())) {
|
||||
start = status.getLen()/2;
|
||||
length = status.getLen() - status.getLen()/2;
|
||||
} else {
|
||||
start = 0;
|
||||
length = status.getLen()/2;
|
||||
}
|
||||
return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* helper function to compute HDFS blocks distribution of a given file.
|
||||
* For reference file, it is an estimate
|
||||
* @param fs The FileSystem
|
||||
* @param o The path of the file
|
||||
* @return HDFS blocks distribution
|
||||
*/
|
||||
static public HDFSBlocksDistribution computeHDFSBlockDistribution(
|
||||
FileSystem fs, Path p) throws IOException {
|
||||
if (isReference(p)) {
|
||||
Reference reference = Reference.read(fs, p);
|
||||
Path referencePath = getReferredToFile(p);
|
||||
return computeRefFileHDFSBlockDistribution(fs, reference, referencePath);
|
||||
} else {
|
||||
FileStatus status = fs.getFileStatus(p);
|
||||
long length = status.getLen();
|
||||
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* compute HDFS block distribution, for reference file, it is an estimate
|
||||
*/
|
||||
private void computeHDFSBlockDistribution() throws IOException {
|
||||
if (isReference()) {
|
||||
this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(
|
||||
this.fs, this.reference, this.referencePath);
|
||||
} else {
|
||||
FileStatus status = this.fs.getFileStatus(this.path);
|
||||
long length = status.getLen();
|
||||
this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(
|
||||
this.fs, status, 0, length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens reader on this store file. Called by Constructor.
|
||||
* @return Reader for the store file.
|
||||
|
@ -402,6 +484,9 @@ public class StoreFile {
|
|||
this.inMemory,
|
||||
this.conf.getBoolean(HFile.EVICT_BLOCKS_ON_CLOSE_KEY, true));
|
||||
}
|
||||
|
||||
computeHDFSBlockDistribution();
|
||||
|
||||
// Load up indices and fileinfo.
|
||||
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
|
||||
// Read in our metadata.
|
||||
|
@ -1235,5 +1320,4 @@ public class StoreFile {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -155,6 +155,12 @@ public class RegionServerMetrics implements Updater {
|
|||
public final MetricsIntValue totalStaticBloomSizeKB =
|
||||
new MetricsIntValue("totalStaticBloomSizeKB", registry);
|
||||
|
||||
/**
|
||||
* HDFS blocks locality index
|
||||
*/
|
||||
public final MetricsIntValue hdfsBlocksLocalityIndex =
|
||||
new MetricsIntValue("hdfsBlocksLocalityIndex", registry);
|
||||
|
||||
/**
|
||||
* Sum of all the memstore sizes in this regionserver in MB
|
||||
*/
|
||||
|
@ -282,6 +288,7 @@ public class RegionServerMetrics implements Updater {
|
|||
this.blockCacheEvictedCount.pushMetric(this.metricsRecord);
|
||||
this.blockCacheHitRatio.pushMetric(this.metricsRecord);
|
||||
this.blockCacheHitCachingRatio.pushMetric(this.metricsRecord);
|
||||
this.hdfsBlocksLocalityIndex.pushMetric(this.metricsRecord);
|
||||
|
||||
// Mix in HFile and HLog metrics
|
||||
// Be careful. Here is code for MTVR from up in hadoop:
|
||||
|
@ -407,6 +414,8 @@ public class RegionServerMetrics implements Updater {
|
|||
Long.valueOf(this.blockCacheHitRatio.get())+"%");
|
||||
sb = Strings.appendKeyValue(sb, this.blockCacheHitCachingRatio.getName(),
|
||||
Long.valueOf(this.blockCacheHitCachingRatio.get())+"%");
|
||||
sb = Strings.appendKeyValue(sb, this.hdfsBlocksLocalityIndex.getName(),
|
||||
Long.valueOf(this.hdfsBlocksLocalityIndex.get()));
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
|
@ -488,6 +490,32 @@ public abstract class FSUtils {
|
|||
return fs.exists(rootRegionDir);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Compute HDFS blocks distribution of a given file, or a portion of the file
|
||||
* @param fs file system
|
||||
* @param FileStatus file status of the file
|
||||
* @param start start position of the portion
|
||||
* @param length length of the portion
|
||||
* @return The HDFS blocks distribution
|
||||
*/
|
||||
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
|
||||
final FileSystem fs, FileStatus status, long start, long length)
|
||||
throws IOException {
|
||||
HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
|
||||
BlockLocation [] blockLocations =
|
||||
fs.getFileBlockLocations(status, start, length);
|
||||
for(BlockLocation bl : blockLocations) {
|
||||
String [] hosts = bl.getHosts();
|
||||
long len = bl.getLength();
|
||||
blocksDistribution.addHostsAndBlockWeight(hosts, len);
|
||||
}
|
||||
|
||||
return blocksDistribution;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Runs through the hbase rootdir and checks all stores have only
|
||||
* one file in them -- that is, they've been major compacted. Looks
|
||||
|
|
|
@ -223,7 +223,27 @@ public class HBaseTestingUtility {
|
|||
* @return The mini dfs cluster created.
|
||||
*/
|
||||
public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
|
||||
return startMiniDFSCluster(servers, null);
|
||||
return startMiniDFSCluster(servers, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a minidfscluster.
|
||||
* This is useful if you want to run datanode on distinct hosts for things
|
||||
* like HDFS block location verification.
|
||||
* If you start MiniDFSCluster without host names, all instances of the
|
||||
* datanodes will have the same host name.
|
||||
* @param hosts hostnames DNs to run on.
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniDFSCluster()}
|
||||
* @return The mini dfs cluster created.
|
||||
*/
|
||||
public MiniDFSCluster startMiniDFSCluster(final String hosts[])
|
||||
throws Exception {
|
||||
if ( hosts != null && hosts.length != 0) {
|
||||
return startMiniDFSCluster(hosts.length, null, hosts);
|
||||
} else {
|
||||
return startMiniDFSCluster(1, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -236,6 +256,22 @@ public class HBaseTestingUtility {
|
|||
* @return The mini dfs cluster created.
|
||||
*/
|
||||
public MiniDFSCluster startMiniDFSCluster(int servers, final File dir)
|
||||
throws Exception {
|
||||
return startMiniDFSCluster(servers, dir, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Start a minidfscluster.
|
||||
* Can only create one.
|
||||
* @param servers How many DNs to start.
|
||||
* @param dir Where to home your dfs cluster.
|
||||
* @param hosts hostnames DNs to run on.
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniDFSCluster()}
|
||||
* @return The mini dfs cluster created.
|
||||
*/
|
||||
public MiniDFSCluster startMiniDFSCluster(int servers, final File dir, final String hosts[])
|
||||
throws Exception {
|
||||
// This does the following to home the minidfscluster
|
||||
// base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
|
||||
|
@ -249,7 +285,7 @@ public class HBaseTestingUtility {
|
|||
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.toString());
|
||||
System.setProperty("test.cache.data", this.clusterTestBuildDir.toString());
|
||||
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
|
||||
true, null, null, null, null);
|
||||
true, null, null, hosts, null);
|
||||
// Set this just-started cluser as our filesystem.
|
||||
FileSystem fs = this.dfsCluster.getFileSystem();
|
||||
this.conf.set("fs.defaultFS", fs.getUri().toString());
|
||||
|
@ -356,6 +392,20 @@ public class HBaseTestingUtility {
|
|||
return startMiniCluster(1, numSlaves);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* start minicluster
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniCluster()}
|
||||
* @return Mini hbase cluster instance created.
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||
final int numSlaves)
|
||||
throws Exception {
|
||||
return startMiniCluster(numMasters, numSlaves, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Start up a minicluster of hbase, optionally dfs, and zookeeper.
|
||||
* Modifies Configuration. Homes the cluster data directory under a random
|
||||
|
@ -365,18 +415,31 @@ public class HBaseTestingUtility {
|
|||
* hbase masters. If numMasters > 1, you can find the active/primary master
|
||||
* with {@link MiniHBaseCluster#getMaster()}.
|
||||
* @param numSlaves Number of slaves to start up. We'll start this many
|
||||
* datanodes and regionservers. If numSlaves is > 1, then make sure
|
||||
* regionservers. If dataNodeHosts == null, this also indicates the number of
|
||||
* datanodes to start. If dataNodeHosts != null, the number of datanodes is
|
||||
* based on dataNodeHosts.length.
|
||||
* If numSlaves is > 1, then make sure
|
||||
* hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
|
||||
* bind errors.
|
||||
* @param dataNodeHosts hostnames DNs to run on.
|
||||
* This is useful if you want to run datanode on distinct hosts for things
|
||||
* like HDFS block location verification.
|
||||
* If you start MiniDFSCluster without host names,
|
||||
* all instances of the datanodes will have the same host name.
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniCluster()}
|
||||
* @return Mini hbase cluster instance created.
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||
final int numSlaves)
|
||||
final int numSlaves, final String[] dataNodeHosts)
|
||||
throws Exception {
|
||||
int numDataNodes = numSlaves;
|
||||
if ( dataNodeHosts != null && dataNodeHosts.length != 0) {
|
||||
numDataNodes = dataNodeHosts.length;
|
||||
}
|
||||
|
||||
LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
|
||||
numSlaves + " regionserver(s) and datanode(s)");
|
||||
numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
|
||||
// If we already put up a cluster, fail.
|
||||
String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
|
||||
isRunningCluster(testBuildPath);
|
||||
|
@ -390,7 +453,7 @@ public class HBaseTestingUtility {
|
|||
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
|
||||
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
|
||||
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
|
||||
startMiniDFSCluster(numSlaves, this.clusterTestBuildDir);
|
||||
startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
|
||||
this.dfsCluster.waitClusterUp();
|
||||
|
||||
// Start up a zk cluster.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -40,17 +43,21 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
|
@ -69,6 +76,7 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -2962,6 +2970,64 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertTrue(keyValues.length == 0);
|
||||
}
|
||||
|
||||
@Test public void testgetHDFSBlocksDistribution() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
final int DEFAULT_BLOCK_SIZE = 1024;
|
||||
htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||
htu.getConfiguration().setInt("dfs.replication", 2);
|
||||
|
||||
|
||||
// set up a cluster with 3 nodes
|
||||
MiniHBaseCluster cluster;
|
||||
String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
|
||||
int regionServersCount = 3;
|
||||
|
||||
try {
|
||||
cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
|
||||
byte [][] families = {fam1, fam2};
|
||||
HTable ht = htu.createTable(Bytes.toBytes(this.getName()), families);
|
||||
|
||||
//Setting up region
|
||||
byte row[] = Bytes.toBytes("row1");
|
||||
byte col[] = Bytes.toBytes("col1");
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, col, 1, Bytes.toBytes("test1"));
|
||||
put.add(fam2, col, 1, Bytes.toBytes("test2"));
|
||||
ht.put(put);
|
||||
|
||||
HRegion firstRegion = htu.getHbaseCluster().
|
||||
getRegions(Bytes.toBytes(this.getName())).get(0);
|
||||
firstRegion.flushcache();
|
||||
HDFSBlocksDistribution blocksDistribution1 =
|
||||
firstRegion.getHDFSBlocksDistribution();
|
||||
|
||||
// given the default replication factor is 2 and we have 2 HFiles,
|
||||
// we will have total of 4 replica of blocks on 3 datanodes; thus there
|
||||
// must be at least one host that have replica for 2 HFiles. That host's
|
||||
// weight will be equal to the unique block weight.
|
||||
long uniqueBlocksWeight1 =
|
||||
blocksDistribution1.getUniqueBlocksTotalWeight();
|
||||
|
||||
String topHost = blocksDistribution1.getTopHosts().get(0);
|
||||
long topHostWeight = blocksDistribution1.getWeight(topHost);
|
||||
assertTrue(uniqueBlocksWeight1 == topHostWeight);
|
||||
|
||||
// use the static method to compute the value, it should be the same.
|
||||
// static method is used by load balancer or other components
|
||||
HDFSBlocksDistribution blocksDistribution2 =
|
||||
HRegion.computeHDFSBlocksDistribution(htu.getConfiguration(),
|
||||
firstRegion.getTableDesc(),
|
||||
firstRegion.getRegionInfo().getEncodedName());
|
||||
long uniqueBlocksWeight2 =
|
||||
blocksDistribution2.getUniqueBlocksTotalWeight();
|
||||
|
||||
assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
|
||||
} finally {
|
||||
htu.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void putData(int startRow, int numRows, byte [] qf,
|
||||
byte [] ...families)
|
||||
throws IOException {
|
||||
|
|
|
@ -23,6 +23,11 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -30,6 +35,7 @@ import org.junit.Test;
|
|||
* Test {@link FSUtils}.
|
||||
*/
|
||||
public class TestFSUtils {
|
||||
|
||||
@Test public void testIsHDFS() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
htu.getConfiguration().setBoolean("dfs.support.append", false);
|
||||
|
@ -44,4 +50,100 @@ public class TestFSUtils {
|
|||
if (cluster != null) cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
|
||||
throws Exception {
|
||||
FSDataOutputStream out = fs.create(file);
|
||||
byte [] data = new byte[dataSize];
|
||||
out.write(data, 0, dataSize);
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Test public void testcomputeHDFSBlocksDistribution() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
final int DEFAULT_BLOCK_SIZE = 1024;
|
||||
htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||
MiniDFSCluster cluster = null;
|
||||
Path testFile = null;
|
||||
|
||||
try {
|
||||
// set up a cluster with 3 nodes
|
||||
String hosts[] = new String[] { "host1", "host2", "host3" };
|
||||
cluster = htu.startMiniDFSCluster(hosts);
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// create a file with two blocks
|
||||
testFile = new Path("/test1.txt");
|
||||
WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE);
|
||||
|
||||
// given the default replication factor is 3, the same as the number of
|
||||
// datanodes; the locality index for each host should be 100%,
|
||||
// or getWeight for each host should be the same as getUniqueBlocksWeights
|
||||
FileStatus status = fs.getFileStatus(testFile);
|
||||
HDFSBlocksDistribution blocksDistribution =
|
||||
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
|
||||
long uniqueBlocksTotalWeight =
|
||||
blocksDistribution.getUniqueBlocksTotalWeight();
|
||||
for (String host : hosts) {
|
||||
long weight = blocksDistribution.getWeight(host);
|
||||
assertTrue(uniqueBlocksTotalWeight == weight);
|
||||
}
|
||||
} finally {
|
||||
htu.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
// set up a cluster with 4 nodes
|
||||
String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
|
||||
cluster = htu.startMiniDFSCluster(hosts);
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// create a file with three blocks
|
||||
testFile = new Path("/test2.txt");
|
||||
WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE);
|
||||
|
||||
// given the default replication factor is 3, we will have total of 9
|
||||
// replica of blocks; thus the host with the highest weight should have
|
||||
// weight == 3 * DEFAULT_BLOCK_SIZE
|
||||
FileStatus status = fs.getFileStatus(testFile);
|
||||
HDFSBlocksDistribution blocksDistribution =
|
||||
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
|
||||
long uniqueBlocksTotalWeight =
|
||||
blocksDistribution.getUniqueBlocksTotalWeight();
|
||||
|
||||
String tophost = blocksDistribution.getTopHosts().get(0);
|
||||
long weight = blocksDistribution.getWeight(tophost);
|
||||
assertTrue(uniqueBlocksTotalWeight == weight);
|
||||
|
||||
} finally {
|
||||
htu.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
// set up a cluster with 4 nodes
|
||||
String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
|
||||
cluster = htu.startMiniDFSCluster(hosts);
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// create a file with one block
|
||||
testFile = new Path("/test3.txt");
|
||||
WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
|
||||
|
||||
// given the default replication factor is 3, we will have total of 3
|
||||
// replica of blocks; thus there is one host without weight
|
||||
FileStatus status = fs.getFileStatus(testFile);
|
||||
HDFSBlocksDistribution blocksDistribution =
|
||||
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
|
||||
assertTrue(blocksDistribution.getTopHosts().size() == 3);
|
||||
} finally {
|
||||
htu.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue