HBASE-10413 Tablesplit.getLength returns 0 (Lukas Nalezenec)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1566768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2014-02-10 22:29:49 +00:00
parent f823cca32c
commit fa77c2a3e5
6 changed files with 438 additions and 16 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
@ -25,12 +26,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@ -73,6 +77,7 @@ public abstract class MultiTableInputFormatBase extends
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
TableSplit tSplit = (TableSplit) split;
LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
if (tSplit.getTableName() == null) {
throw new IOException("Cannot create a record reader because of a"
@ -139,12 +144,15 @@ public abstract class MultiTableInputFormatBase extends
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
for (int i = 0; i < keys.getFirst().length; i++) {
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue;
}
String regionLocation =
table.getRegionLocation(keys.getFirst()[i], false).getHostname();
HRegionLocation hregionLocation = table.getRegionLocation(keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
// determine if the given start and stop keys fall into the range
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
@ -159,9 +167,12 @@ public abstract class MultiTableInputFormatBase extends
(stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
.getSecond()[i] : stopRow;
InputSplit split =
long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName());
TableSplit split =
new TableSplit(table.getName(),
scan, splitStart, splitStop, regionLocation);
scan, splitStart, splitStop, regionHostname, regionSize);
splits.add(split);
if (LOG.isDebugEnabled())
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@ -121,6 +123,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
" the task's full log for more details.");
}
TableSplit tSplit = (TableSplit) split;
LOG.info("Input split length: " + tSplit.getLength() + " bytes.");
TableRecordReader trr = this.tableRecordReader;
// if no table record reader was provided use default
if (trr == null) {
@ -153,6 +156,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
this.nameServer =
context.getConfiguration().get("hbase.nameserver.address", null);
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
@ -161,9 +166,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
throw new IOException("Expecting at least one region.");
}
List<InputSplit> splits = new ArrayList<InputSplit>(1);
InputSplit split = new TableSplit(table.getName(),
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
TableSplit split = new TableSplit(table.getName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0]);
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
return splits;
}
@ -201,8 +207,11 @@ extends InputFormat<ImmutableBytesWritable, Result> {
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
InputSplit split = new TableSplit(table.getName(),
splitStart, splitStop, regionLocation);
byte[] regionName = location.getRegionInfo().getRegionName();
long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(table.getName(),
splitStart, splitStop, regionLocation, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + i + " -> " + split);

View File

@ -82,6 +82,7 @@ implements Writable, Comparable<TableSplit> {
private byte [] endRow;
private String regionLocation;
private String scan = ""; // stores the serialized form of the Scan
private long length; // Contains estimation of region size in bytes
/** Default constructor. */
public TableSplit() {
@ -100,6 +101,7 @@ implements Writable, Comparable<TableSplit> {
/**
* Creates a new instance while assigning all variables.
* Length of region is set to 0
*
* @param tableName The name of the current table.
* @param scan The scan associated with this split.
@ -109,6 +111,20 @@ implements Writable, Comparable<TableSplit> {
*/
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
final String location) {
this(tableName, scan, startRow, endRow, location, 0L);
}
/**
* Creates a new instance while assigning all variables.
*
* @param tableName The name of the current table.
* @param scan The scan associated with this split.
* @param startRow The start row of the split.
* @param endRow The end row of the split.
* @param location The location of the region.
*/
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
final String location, long length) {
this.tableName = tableName;
try {
this.scan =
@ -119,6 +135,7 @@ implements Writable, Comparable<TableSplit> {
this.startRow = startRow;
this.endRow = endRow;
this.regionLocation = location;
this.length = length;
}
/**
@ -143,6 +160,20 @@ implements Writable, Comparable<TableSplit> {
this(tableName, null, startRow, endRow, location);
}
/**
* Creates a new instance without a scanner.
*
* @param tableName The name of the current table.
* @param startRow The start row of the split.
* @param endRow The end row of the split.
* @param location The location of the region.
* @param length Size of region in bytes
*/
public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
final String location, long length) {
this(tableName, null, startRow, endRow, location, length);
}
/**
* Returns a Scan object from the stored string representation.
*
@ -220,8 +251,7 @@ implements Writable, Comparable<TableSplit> {
*/
@Override
public long getLength() {
// Not clear how to obtain this... seems to be used only for sorting splits
return 0;
return length;
}
/**
@ -256,6 +286,7 @@ implements Writable, Comparable<TableSplit> {
if (version.atLeast(Version.INITIAL)) {
scan = Bytes.toString(Bytes.readByteArray(in));
}
length = WritableUtils.readVLong(in);
}
/**
@ -272,6 +303,7 @@ implements Writable, Comparable<TableSplit> {
Bytes.writeByteArray(out, endRow);
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
Bytes.writeByteArray(out, Bytes.toBytes(scan));
WritableUtils.writeVLong(out, length);
}
/**

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
/**
* Computes size of each region for given table and given column families.
* The value is used by MapReduce for better scheduling.
* */
@InterfaceStability.Evolving
@InterfaceAudience.Private
public class RegionSizeCalculator {
private final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
/**
* Maps each region to its size in bytes.
* */
private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
/**
* Computes size of each region for table and given column families.
* */
public RegionSizeCalculator(HTable table) throws IOException {
this(table, new HBaseAdmin(table.getConfiguration()));
}
/** ctor for unit testing */
RegionSizeCalculator (HTable table, HBaseAdmin admin) throws IOException {
try {
if (!enabled(table.getConfiguration())) {
LOG.info("Region size calculation disabled.");
return;
}
LOG.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
//get regions for table
Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (HRegionInfo regionInfo : tableRegionInfos) {
tableRegions.add(regionInfo.getRegionName());
}
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
//iterate all cluster regions, filter regions from our table and compute their size
for (ServerName serverName: servers) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
byte[] regionId = regionLoad.getName();
if (tableRegions.contains(regionId)) {
long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
sizeMap.put(regionId, regionSizeBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
}
}
}
}
LOG.debug("Region sizes calculated");
} finally {
admin.close();
}
}
boolean enabled(Configuration configuration) {
return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
}
/**
* Returns size of given region in bytes. Returns 0 if region was not found.
* */
public long getRegionSize(byte[] regionId) {
Long size = sizeMap.get(regionId);
if (size == null) {
LOG.debug("Unknown region:" + Arrays.toString(regionId));
return 0;
} else {
return size;
}
}
public Map<byte[], Long> getRegionSizeMap() {
return Collections.unmodifiableMap(sizeMap);
}
}

View File

@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.HashSet;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(SmallTests.class)
@ -45,5 +48,42 @@ public class TestTableSplit {
assertTrue(set.size() == 1);
}
/**
* length of region should not influence hashcode
* */
@Test
public void testHashCode_length() {
TableSplit split1 = new TableSplit(TableName.valueOf("table"),
"row-start".getBytes(),
"row-end".getBytes(), "location", 1984);
TableSplit split2 = new TableSplit(TableName.valueOf("table"),
"row-start".getBytes(),
"row-end".getBytes(), "location", 1982);
assertEquals (split1, split2);
assertTrue (split1.hashCode() == split2.hashCode());
HashSet<TableSplit> set = new HashSet<TableSplit>(2);
set.add(split1);
set.add(split2);
assertTrue(set.size() == 1);
}
/**
* Length of region need to be properly serialized.
* */
@Test
public void testLengthIsSerialized() throws Exception {
TableSplit split1 = new TableSplit(TableName.valueOf("table"),
"row-start".getBytes(),
"row-end".getBytes(), "location", 666);
TableSplit deserialized = new TableSplit(TableName.valueOf("table"),
"row-start2".getBytes(),
"row-end2".getBytes(), "location1");
ReflectionUtils.copy(new Configuration(), split1, deserialized);
Assert.assertEquals(666, deserialized.getLength());
}
}

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Category(SmallTests.class)
public class TestRegionSizeCalculator {
private Configuration configuration = new Configuration();
private final long megabyte = 1024L * 1024L;
@Test
public void testSimpleTestCase() throws Exception {
HTable table = mockTable("region1", "region2", "region3");
HBaseAdmin admin = mockAdmin(
mockServer(
mockRegion("region1", 123),
mockRegion("region3", 1232)
),
mockServer(
mockRegion("region2", 54321),
mockRegion("otherTableRegion", 110)
)
);
RegionSizeCalculator calculator = new RegionSizeCalculator(table, admin);
assertEquals(123 * megabyte, calculator.getRegionSize("region1".getBytes()));
assertEquals(54321 * megabyte, calculator.getRegionSize("region2".getBytes()));
assertEquals(1232 * megabyte, calculator.getRegionSize("region3".getBytes()));
// if region is not inside our table, it should return 0
assertEquals(0 * megabyte, calculator.getRegionSize("otherTableRegion".getBytes()));
assertEquals(3, calculator.getRegionSizeMap().size());
}
/**
* When size of region in megabytes is larger than largest possible integer there could be
* error caused by lost of precision.
* */
@Test
public void testLargeRegion() throws Exception {
HTable table = mockTable("largeRegion");
HBaseAdmin admin = mockAdmin(
mockServer(
mockRegion("largeRegion", Integer.MAX_VALUE)
)
);
RegionSizeCalculator calculator = new RegionSizeCalculator(table, admin);
assertEquals(((long) Integer.MAX_VALUE) * megabyte, calculator.getRegionSize("largeRegion".getBytes()));
}
/** When calculator is disabled, it should return 0 for each request.*/
@Test
public void testDisabled() throws Exception {
String regionName = "cz.goout:/index.html";
HTable table = mockTable(regionName);
HBaseAdmin admin = mockAdmin(
mockServer(
mockRegion(regionName, 999)
)
);
//first request on enabled calculator
RegionSizeCalculator calculator = new RegionSizeCalculator(table, admin);
assertEquals(999 * megabyte, calculator.getRegionSize(regionName.getBytes()));
//then disabled calculator.
configuration.setBoolean(RegionSizeCalculator.ENABLE_REGIONSIZECALCULATOR, false);
RegionSizeCalculator disabledCalculator = new RegionSizeCalculator(table, admin);
assertEquals(0 * megabyte, disabledCalculator.getRegionSize(regionName.getBytes()));
assertEquals(0, disabledCalculator.getRegionSizeMap().size());
}
/**
* Makes some table with given region names.
* */
private HTable mockTable(String... regionNames) throws IOException {
HTable mockedTable = Mockito.mock(HTable.class);
when(mockedTable.getConfiguration()).thenReturn(configuration);
when(mockedTable.getTableName()).thenReturn("sizeTestTable".getBytes());
NavigableMap<HRegionInfo, ServerName> regionLocations = new TreeMap<HRegionInfo, ServerName>();
when(mockedTable.getRegionLocations()).thenReturn(regionLocations);
for (String regionName : regionNames) {
HRegionInfo info = Mockito.mock(HRegionInfo.class);
when(info.getRegionName()).thenReturn(regionName.getBytes());
regionLocations.put(info, null);//we are not interested in values
}
return mockedTable;
}
/**
* Creates mock returing ClusterStatus info about given servers.
*/
private HBaseAdmin mockAdmin(ServerLoad... servers) throws Exception {
//get clusterstatus
HBaseAdmin mockAdmin = Mockito.mock(HBaseAdmin.class);
ClusterStatus clusterStatus = mockCluster(servers);
when(mockAdmin.getClusterStatus()).thenReturn(clusterStatus);
return mockAdmin;
}
/**
* Creates mock of region with given name and size.
*
* @param fileSizeMb number of megabytes occupied by region in file store in megabytes
* */
private RegionLoad mockRegion(String regionName, int fileSizeMb) {
RegionLoad region = Mockito.mock(RegionLoad.class);
when(region.getName()).thenReturn(regionName.getBytes());
when(region.getNameAsString()).thenReturn(regionName);
when(region.getStorefileSizeMB()).thenReturn(fileSizeMb);
return region;
}
private ClusterStatus mockCluster(ServerLoad[] servers) {
List<ServerName> serverNames = new ArrayList<ServerName>();
ClusterStatus clusterStatus = Mockito.mock(ClusterStatus.class);
when(clusterStatus.getServers()).thenReturn(serverNames);
int serverCounter = 0;
for (ServerLoad server : servers) {
ServerName serverName = mock(ServerName.class);
when(serverName.getServerName()).thenReturn("server" + (serverCounter++));
serverNames.add(serverName);
when(clusterStatus.getLoad(serverName)).thenReturn(server);
}
return clusterStatus;
}
/** Creates mock of region server with given regions*/
private ServerLoad mockServer(RegionLoad... regions) {
ServerLoad serverLoad = Mockito.mock(ServerLoad.class);
Map<byte[], RegionLoad> regionMap = new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
for (RegionLoad regionName : regions) {
regionMap.put(regionName.getName(), regionName);
}
when(serverLoad.getRegionsLoad()).thenReturn(regionMap);
return serverLoad;
}
}