HBASE-8369 MapReduce over snapshot files

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1543195 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-11-18 22:17:39 +00:00
parent c28ac79c82
commit a26dd714b5
22 changed files with 3125 additions and 140 deletions

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Helper class for custom client scanners.
@ -28,6 +31,49 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class AbstractClientScanner implements ResultScanner {
protected ScanMetrics scanMetrics;
/**
* Check and initialize if application wants to collect scan metrics
*/
protected void initScanMetrics(Scan scan) {
// check if application wants to collect scan metrics
byte[] enableMetrics = scan.getAttribute(
Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
scanMetrics = new ScanMetrics();
}
}
// TODO: should this be at ResultScanner? ScanMetrics is not public API it seems.
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
/**
* Get <param>nbRows</param> rows.
* How many RPCs are made is determined by the {@link Scan#setCaching(int)}
* setting (or hbase.client.scanner.caching in hbase-site.xml).
* @param nbRows number of rows to return
* @return Between zero and <param>nbRows</param> RowResults. Scan is done
* if returned array is of zero-length (We never return null).
* @throws IOException
*/
@Override
public Result [] next(int nbRows) throws IOException {
// Collect values to be returned here
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
for(int i = 0; i < nbRows; i++) {
Result next = next();
if (next != null) {
resultSets.add(next);
} else {
break;
}
}
return resultSets.toArray(new Result[resultSets.size()]);
}
@Override
public Iterator<Result> iterator() {
return new Iterator<Result>() {
@ -38,6 +84,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
// this method is where the actual advancing takes place, but you need
// to call next() to consume it. hasNext() will only advance if there
// isn't a pending next().
@Override
public boolean hasNext() {
if (next == null) {
try {
@ -52,6 +99,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
// get the pending next item and advance the iterator. returns null if
// there is no next item.
@Override
public Result next() {
// since hasNext() does the real advancing, we call this to determine
// if there is a next before proceeding.
@ -67,6 +115,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
return temp;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@ -62,7 +60,6 @@ public class ClientScanner extends AbstractClientScanner {
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
protected Result lastResult = null;
protected ScanMetrics scanMetrics = null;
protected final long maxScannerResultSize;
private final HConnection connection;
private final TableName tableName;
@ -151,11 +148,7 @@ public class ClientScanner extends AbstractClientScanner {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
// check if application wants to collect scan metrics
byte[] enableMetrics = scan.getAttribute(
Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
scanMetrics = new ScanMetrics();
}
initScanMetrics(scan);
// Use the caching from the Scan. If not set, use the default cache setting for this table.
if (this.scan.getCaching() > 0) {
@ -170,7 +163,7 @@ public class ClientScanner extends AbstractClientScanner {
initializeScannerInConstruction();
}
protected void initializeScannerInConstruction() throws IOException{
// initialize the scanner
nextScanner(this.caching, false);
@ -429,30 +422,6 @@ public class ClientScanner extends AbstractClientScanner {
return null;
}
/**
* Get <param>nbRows</param> rows.
* How many RPCs are made is determined by the {@link Scan#setCaching(int)}
* setting (or hbase.client.scanner.caching in hbase-site.xml).
* @param nbRows number of rows to return
* @return Between zero and <param>nbRows</param> RowResults. Scan is done
* if returned array is of zero-length (We never return null).
* @throws IOException
*/
@Override
public Result [] next(int nbRows) throws IOException {
// Collect values to be returned here
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
for(int i = 0; i < nbRows; i++) {
Result next = next();
if (next != null) {
resultSets.add(next);
} else {
break;
}
}
return resultSets.toArray(new Result[resultSets.size()]);
}
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();

View File

@ -82,7 +82,7 @@ public final class CellUtil {
copyValueTo(cell, output, 0);
return output;
}
public static byte[] getTagArray(Cell cell){
byte[] output = new byte[cell.getTagsLength()];
copyTagTo(cell, output, 0);
@ -414,4 +414,16 @@ public final class CellUtil {
}
};
}
/**
* Returns true if the first range start1...end1 overlaps with the second range
* start2...end2, assuming the byte arrays represent row keys
*/
public static boolean overlappingKeys(final byte[] start1, final byte[] end1,
final byte[] start2, final byte[] end2) {
return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1,
end2) < 0)
&& (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
end1) < 0);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestCellUtil {
@Test
public void testOverlappingKeys() {
byte[] empty = HConstants.EMPTY_BYTE_ARRAY;
byte[] a = Bytes.toBytes("a");
byte[] b = Bytes.toBytes("b");
byte[] c = Bytes.toBytes("c");
byte[] d = Bytes.toBytes("d");
// overlaps
Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, b));
Assert.assertTrue(CellUtil.overlappingKeys(a, c, a, b));
Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, c));
Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, c));
Assert.assertTrue(CellUtil.overlappingKeys(a, c, b, c));
Assert.assertTrue(CellUtil.overlappingKeys(a, d, b, c));
Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, d));
Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, b));
Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, c));
Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, b));
Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, c));
Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, b));
Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, c));
Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, empty));
Assert.assertTrue(CellUtil.overlappingKeys(empty, empty, a, b));
// non overlaps
Assert.assertFalse(CellUtil.overlappingKeys(a, b, c, d));
Assert.assertFalse(CellUtil.overlappingKeys(c, d, a, b));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, c, d));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, c, empty));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, d, empty));
Assert.assertFalse(CellUtil.overlappingKeys(c, d, b, c));
Assert.assertFalse(CellUtil.overlappingKeys(c, empty, b, c));
Assert.assertFalse(CellUtil.overlappingKeys(d, empty, b, c));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, a, b));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, empty, b));
Assert.assertFalse(CellUtil.overlappingKeys(b, c, empty, a));
Assert.assertFalse(CellUtil.overlappingKeys(a,b, b, c));
Assert.assertFalse(CellUtil.overlappingKeys(empty, b, b, c));
Assert.assertFalse(CellUtil.overlappingKeys(empty, a, b, c));
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
/**
* An integration test to test {@link TableSnapshotInputFormat} which enables
* reading directly from snapshot files without going through hbase servers.
*
* This test creates a table and loads the table with the rows ranging from
* 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and f2:(null) to be
* the the same as the row value.
* <pre>
* aaa, f1: => aaa
* aaa, f2: => aaa
* aab, f1: => aab
* ....
* zzz, f2: => zzz
* </pre>
*
* Then the test creates a snapshot from this table, and overrides the values in the original
* table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot
* with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and
* inspected later to verify that the MR job has seen all the values from the snapshot.
*
* <p> These parameters can be used to configure the job:
* <br>"IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table
* <br>"IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot
* <br>"IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table to be created
* <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the snapshot files
*
*/
@Category(IntegrationTests.class)
// Not runnable as a unit test. See TestTableSnapshotInputFormat
public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(IntegrationTestTableSnapshotInputFormat.class);
private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
private static final String SNAPSHOT_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.snapshot";
private static final String NUM_REGIONS_KEY = "IntegrationTestTableSnapshotInputFormat.numRegions";
private static final int DEFAULT_NUM_REGIONS = 32;
private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
private IntegrationTestingUtility util;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
util = getTestingUtil(conf);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
util = getTestingUtil(getConf());
util.initializeCluster(1);
this.setConf(util.getConfiguration());
}
@Override
@After
public void cleanUp() throws Exception {
util.restoreCluster();
}
@Override
public void setUpCluster() throws Exception {
}
@Override
public int runTestFromCommandLine() throws Exception {
Configuration conf = getConf();
TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
String snapshotName = conf.get(SNAPSHOT_NAME_KEY, tableName.getQualifierAsString()
+ "_snapshot_" + System.currentTimeMillis());
int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
String tableDirStr = conf.get(TABLE_DIR_KEY);
Path tableDir;
if (tableDirStr == null) {
tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
} else {
tableDir = new Path(tableDirStr);
}
/* We create the table using HBaseAdmin#createTable(), which will create the table
* with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
* desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
* create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
* bbb and endRow yyy, so, we expect the first and last region to be filtered out in
* the input format, and we expect numRegions - 2 splits between bbb and yyy.
*/
int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir,
numRegions, expectedNumSplits, false);
return 0;
}
@Override // CM is not intended to be run with this test
public String getTablename() {
return null;
}
@Override
protected Set<String> getColumnFamilies() {
return null;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
System.exit(ret);
}
}

View File

@ -717,11 +717,764 @@ public final class MapReduceProtos {
// @@protoc_insertion_point(class_scope:ScanMetrics)
}
public interface TableSnapshotRegionSplitOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// optional .RegionSpecifier region = 1;
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
boolean hasRegion();
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion();
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder();
// repeated string locations = 2;
/**
* <code>repeated string locations = 2;</code>
*/
java.util.List<java.lang.String>
getLocationsList();
/**
* <code>repeated string locations = 2;</code>
*/
int getLocationsCount();
/**
* <code>repeated string locations = 2;</code>
*/
java.lang.String getLocations(int index);
/**
* <code>repeated string locations = 2;</code>
*/
com.google.protobuf.ByteString
getLocationsBytes(int index);
}
/**
* Protobuf type {@code TableSnapshotRegionSplit}
*/
public static final class TableSnapshotRegionSplit extends
com.google.protobuf.GeneratedMessage
implements TableSnapshotRegionSplitOrBuilder {
// Use TableSnapshotRegionSplit.newBuilder() to construct.
private TableSnapshotRegionSplit(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private TableSnapshotRegionSplit(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final TableSnapshotRegionSplit defaultInstance;
public static TableSnapshotRegionSplit getDefaultInstance() {
return defaultInstance;
}
public TableSnapshotRegionSplit getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private TableSnapshotRegionSplit(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder subBuilder = null;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
subBuilder = region_.toBuilder();
}
region_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(region_);
region_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000001;
break;
}
case 18: {
if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
locations_ = new com.google.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000002;
}
locations_.add(input.readBytes());
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
locations_ = new com.google.protobuf.UnmodifiableLazyStringList(locations_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable
.ensureFieldAccessorsInitialized(
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class);
}
public static com.google.protobuf.Parser<TableSnapshotRegionSplit> PARSER =
new com.google.protobuf.AbstractParser<TableSnapshotRegionSplit>() {
public TableSnapshotRegionSplit parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new TableSnapshotRegionSplit(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<TableSnapshotRegionSplit> getParserForType() {
return PARSER;
}
private int bitField0_;
// optional .RegionSpecifier region = 1;
public static final int REGION_FIELD_NUMBER = 1;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_;
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public boolean hasRegion() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() {
return region_;
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() {
return region_;
}
// repeated string locations = 2;
public static final int LOCATIONS_FIELD_NUMBER = 2;
private com.google.protobuf.LazyStringList locations_;
/**
* <code>repeated string locations = 2;</code>
*/
public java.util.List<java.lang.String>
getLocationsList() {
return locations_;
}
/**
* <code>repeated string locations = 2;</code>
*/
public int getLocationsCount() {
return locations_.size();
}
/**
* <code>repeated string locations = 2;</code>
*/
public java.lang.String getLocations(int index) {
return locations_.get(index);
}
/**
* <code>repeated string locations = 2;</code>
*/
public com.google.protobuf.ByteString
getLocationsBytes(int index) {
return locations_.getByteString(index);
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (hasRegion()) {
if (!getRegion().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, region_);
}
for (int i = 0; i < locations_.size(); i++) {
output.writeBytes(2, locations_.getByteString(i));
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, region_);
}
{
int dataSize = 0;
for (int i = 0; i < locations_.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream
.computeBytesSizeNoTag(locations_.getByteString(i));
}
size += dataSize;
size += 1 * getLocationsList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) obj;
boolean result = true;
result = result && (hasRegion() == other.hasRegion());
if (hasRegion()) {
result = result && getRegion()
.equals(other.getRegion());
}
result = result && getLocationsList()
.equals(other.getLocationsList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
private int memoizedHashCode = 0;
@java.lang.Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasRegion()) {
hash = (37 * hash) + REGION_FIELD_NUMBER;
hash = (53 * hash) + getRegion().hashCode();
}
if (getLocationsCount() > 0) {
hash = (37 * hash) + LOCATIONS_FIELD_NUMBER;
hash = (53 * hash) + getLocationsList().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code TableSnapshotRegionSplit}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplitOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable
.ensureFieldAccessorsInitialized(
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class);
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getRegionFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
if (regionBuilder_ == null) {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
} else {
regionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
}
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit build() {
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = new org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
if (regionBuilder_ == null) {
result.region_ = region_;
} else {
result.region_ = regionBuilder_.build();
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
locations_ = new com.google.protobuf.UnmodifiableLazyStringList(
locations_);
bitField0_ = (bitField0_ & ~0x00000002);
}
result.locations_ = locations_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance()) return this;
if (other.hasRegion()) {
mergeRegion(other.getRegion());
}
if (!other.locations_.isEmpty()) {
if (locations_.isEmpty()) {
locations_ = other.locations_;
bitField0_ = (bitField0_ & ~0x00000002);
} else {
ensureLocationsIsMutable();
locations_.addAll(other.locations_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (hasRegion()) {
if (!getRegion().isInitialized()) {
return false;
}
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// optional .RegionSpecifier region = 1;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> regionBuilder_;
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public boolean hasRegion() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() {
if (regionBuilder_ == null) {
return region_;
} else {
return regionBuilder_.getMessage();
}
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public Builder setRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) {
if (regionBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
region_ = value;
onChanged();
} else {
regionBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public Builder setRegion(
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder builderForValue) {
if (regionBuilder_ == null) {
region_ = builderForValue.build();
onChanged();
} else {
regionBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public Builder mergeRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) {
if (regionBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
region_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance()) {
region_ =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.newBuilder(region_).mergeFrom(value).buildPartial();
} else {
region_ = value;
}
onChanged();
} else {
regionBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public Builder clearRegion() {
if (regionBuilder_ == null) {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
onChanged();
} else {
regionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder getRegionBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getRegionFieldBuilder().getBuilder();
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() {
if (regionBuilder_ != null) {
return regionBuilder_.getMessageOrBuilder();
} else {
return region_;
}
}
/**
* <code>optional .RegionSpecifier region = 1;</code>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>
getRegionFieldBuilder() {
if (regionBuilder_ == null) {
regionBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>(
region_,
getParentForChildren(),
isClean());
region_ = null;
}
return regionBuilder_;
}
// repeated string locations = 2;
private com.google.protobuf.LazyStringList locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
private void ensureLocationsIsMutable() {
if (!((bitField0_ & 0x00000002) == 0x00000002)) {
locations_ = new com.google.protobuf.LazyStringArrayList(locations_);
bitField0_ |= 0x00000002;
}
}
/**
* <code>repeated string locations = 2;</code>
*/
public java.util.List<java.lang.String>
getLocationsList() {
return java.util.Collections.unmodifiableList(locations_);
}
/**
* <code>repeated string locations = 2;</code>
*/
public int getLocationsCount() {
return locations_.size();
}
/**
* <code>repeated string locations = 2;</code>
*/
public java.lang.String getLocations(int index) {
return locations_.get(index);
}
/**
* <code>repeated string locations = 2;</code>
*/
public com.google.protobuf.ByteString
getLocationsBytes(int index) {
return locations_.getByteString(index);
}
/**
* <code>repeated string locations = 2;</code>
*/
public Builder setLocations(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureLocationsIsMutable();
locations_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string locations = 2;</code>
*/
public Builder addLocations(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureLocationsIsMutable();
locations_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string locations = 2;</code>
*/
public Builder addAllLocations(
java.lang.Iterable<java.lang.String> values) {
ensureLocationsIsMutable();
super.addAll(values, locations_);
onChanged();
return this;
}
/**
* <code>repeated string locations = 2;</code>
*/
public Builder clearLocations() {
locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
onChanged();
return this;
}
/**
* <code>repeated string locations = 2;</code>
*/
public Builder addLocationsBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureLocationsIsMutable();
locations_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:TableSnapshotRegionSplit)
}
static {
defaultInstance = new TableSnapshotRegionSplit(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:TableSnapshotRegionSplit)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_ScanMetrics_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ScanMetrics_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_TableSnapshotRegionSplit_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_TableSnapshotRegionSplit_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -732,9 +1485,11 @@ public final class MapReduceProtos {
static {
java.lang.String[] descriptorData = {
"\n\017MapReduce.proto\032\013HBase.proto\".\n\013ScanMe" +
"trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64PairBB" +
"\n*org.apache.hadoop.hbase.protobuf.gener" +
"atedB\017MapReduceProtosH\001\240\001\001"
"trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64Pair\"O" +
"\n\030TableSnapshotRegionSplit\022 \n\006region\030\001 \001" +
"(\0132\020.RegionSpecifier\022\021\n\tlocations\030\002 \003(\tB" +
"B\n*org.apache.hadoop.hbase.protobuf.gene" +
"ratedB\017MapReduceProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -747,6 +1502,12 @@ public final class MapReduceProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanMetrics_descriptor,
new java.lang.String[] { "Metrics", });
internal_static_TableSnapshotRegionSplit_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_TableSnapshotRegionSplit_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_TableSnapshotRegionSplit_descriptor,
new java.lang.String[] { "Region", "Locations", });
return null;
}
};

View File

@ -18,15 +18,18 @@
//This file includes protocol buffers used in MapReduce only.
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "MapReduceProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "MapReduceProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
import "HBase.proto";
message ScanMetrics {
message ScanMetrics {
repeated NameInt64Pair metrics = 1;
}
repeated NameInt64Pair metrics = 1;
}
message TableSnapshotRegionSplit {
optional RegionSpecifier region = 1;
repeated string locations = 2;
}

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class HDFSBlocksDistribution {
private Map<String,HostAndWeight> hostAndWeights = null;
private long uniqueBlocksTotalWeight = 0;
/**
* Stores the hostname and weight for that hostname.
*
@ -58,7 +58,7 @@ public class HDFSBlocksDistribution {
* Constructor
* @param host the host name
* @param weight the weight
*/
*/
public HostAndWeight(String host, long weight) {
this.host = host;
this.weight = weight;
@ -67,28 +67,28 @@ public class HDFSBlocksDistribution {
/**
* add weight
* @param weight the weight
*/
*/
public void addWeight(long weight) {
this.weight += weight;
}
/**
* @return the host name
*/
*/
public String getHost() {
return host;
}
/**
* @return the weight
*/
*/
public long getWeight() {
return weight;
}
/**
* comparator used to sort hosts based on weight
*/
*/
public static class WeightComparator implements Comparator<HostAndWeight> {
@Override
public int compare(HostAndWeight l, HostAndWeight r) {
@ -99,7 +99,7 @@ public class HDFSBlocksDistribution {
}
}
}
/**
* Constructor
*/
@ -137,12 +137,12 @@ public class HDFSBlocksDistribution {
/**
* add some weight to the total unique weight
* @param weight the weight
*/
*/
private void addUniqueWeight(long weight) {
uniqueBlocksTotalWeight += weight;
}
/**
* add some weight to a specific host
* @param host the host name
@ -186,14 +186,14 @@ public class HDFSBlocksDistribution {
}
return weight;
}
/**
* @return the sum of all unique blocks' weight
*/
public long getUniqueBlocksTotalWeight() {
return uniqueBlocksTotalWeight;
}
/**
* return the locality index of a given host
* @param host the host name
@ -207,8 +207,8 @@ public class HDFSBlocksDistribution {
}
return localityIndex;
}
/**
* This will add the distribution from input to this object
* @param otherBlocksDistribution the other hdfs blocks distribution
@ -223,19 +223,27 @@ public class HDFSBlocksDistribution {
}
addUniqueWeight(otherBlocksDistribution.getUniqueBlocksTotalWeight());
}
/**
* return the sorted list of hosts in terms of their weights
*/
public List<String> getTopHosts() {
NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
new HostAndWeight.WeightComparator());
orderedHosts.addAll(this.hostAndWeights.values());
List<String> topHosts = new ArrayList<String>(orderedHosts.size());
for(HostAndWeight haw : orderedHosts.descendingSet()) {
HostAndWeight[] hostAndWeights = getTopHostsWithWeights();
List<String> topHosts = new ArrayList<String>(hostAndWeights.length);
for(HostAndWeight haw : hostAndWeights) {
topHosts.add(haw.getHost());
}
return topHosts;
}
/**
* return the sorted list of hosts in terms of their weights
*/
public HostAndWeight[] getTopHostsWithWeights() {
NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
new HostAndWeight.WeightComparator());
orderedHosts.addAll(this.hostAndWeights.values());
return orderedHosts.descendingSet().toArray(new HostAndWeight[orderedHosts.size()]);
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.mortbay.log.Log;
/**
* A client scanner for a region opened for read-only on the client side. Assumes region data
* is not changing.
*/
@InterfaceAudience.Private
public class ClientSideRegionScanner extends AbstractClientScanner {
private HRegion region;
private Scan scan;
RegionScanner scanner;
List<Cell> values;
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
this.scan = scan;
// region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
// open region from the snapshot directory
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
// create an internal region scanner
this.scanner = region.getScanner(scan);
values = new ArrayList<Cell>();
if (scanMetrics == null) {
initScanMetrics(scan);
} else {
this.scanMetrics = scanMetrics;
}
region.startRegionOperation();
}
@Override
public Result next() throws IOException {
values.clear();
scanner.nextRaw(values, -1); // pass -1 as limit so that we see the whole row.
if (values == null || values.isEmpty()) {
//we are done
return null;
}
Result result = Result.create(values);
if (this.scanMetrics != null) {
long resultSize = 0;
for (Cell kv : values) {
// TODO add getLength to Cell/use CellUtil#estimatedSizeOf
resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
}
this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
}
return result;
}
@Override
public void close() {
if (this.scanner != null) {
try {
this.scanner.close();
this.scanner = null;
} catch (IOException ex) {
Log.warn("Exception while closing scanner", ex);
}
}
if (this.region != null) {
this.region.closeRegionOperation();
try {
this.region.close(true);
this.region = null;
} catch (IOException ex) {
Log.warn("Exception while closing region", ex);
}
}
}
}

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
/**
* A Scanner which performs a scan over snapshot files. Using this class requires copying the
* snapshot to a temporary empty directory, which will copy the snapshot reference files into that
* directory. Actual data files are not copied.
*
* <p>
* This also allows one to run the scan from an
* online or offline hbase cluster. The snapshot files can be exported by using the
* {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this scanner can be used to
* run the scan directly over the snapshot files. The snapshot should not be deleted while there
* are open scanners reading from snapshot files.
*
* <p>
* An internal RegionScanner is used to execute the {@link Scan} obtained
* from the user for each region in the snapshot.
* <p>
* HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
* snapshot files and data files. HBase also enforces security because all the requests are handled
* by the server layer, and the user cannot read from the data files directly. To read from snapshot
* files directly from the file system, the user who is running the MR job must have sufficient
* permissions to access snapshot and reference files. This means that to run mapreduce over
* snapshot files, the job has to be run as the HBase user or the user must have group or other
* priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
* snapshot/data files will completely circumvent the access control enforced by HBase.
* @see TableSnapshotInputFormat
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableSnapshotScanner extends AbstractClientScanner {
private static final Log LOG = LogFactory.getLog(TableSnapshotScanner.class);
private Configuration conf;
private String snapshotName;
private FileSystem fs;
private Path rootDir;
private Path restoreDir;
private Scan scan;
private ArrayList<HRegionInfo> regions;
private HTableDescriptor htd;
private ClientSideRegionScanner currentRegionScanner = null;
private int currentRegion = -1;
/**
* Creates a TableSnapshotScanner.
* @param conf the configuration
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* The scanner deletes the contents of the directory once the scanner is closed.
* @param snapshotName the name of the snapshot to read from
* @param scan a Scan representing scan parameters
* @throws IOException in case of error
*/
public TableSnapshotScanner(Configuration conf, Path restoreDir,
String snapshotName, Scan scan) throws IOException {
this(conf, new Path(conf.get(HConstants.HBASE_DIR)),
restoreDir, snapshotName, scan);
}
/**
* Creates a TableSnapshotScanner.
* @param conf the configuration
* @param rootDir root directory for HBase.
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* The scanner deletes the contents of the directory once the scanner is closed.
* @param snapshotName the name of the snapshot to read from
* @param scan a Scan representing scan parameters
* @throws IOException in case of error
*/
public TableSnapshotScanner(Configuration conf, Path rootDir,
Path restoreDir, String snapshotName, Scan scan) throws IOException {
this.conf = conf;
this.snapshotName = snapshotName;
this.rootDir = rootDir;
// restoreDir will be deleted in close(), use a unique sub directory
this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
this.scan = scan;
this.fs = rootDir.getFileSystem(conf);
init();
}
private void init() throws IOException {
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
//load table descriptor
htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
Set<String> snapshotRegionNames
= SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
if (snapshotRegionNames == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
regions = new ArrayList<HRegionInfo>(snapshotRegionNames.size());
for (String regionName : snapshotRegionNames) {
// load region descriptor
Path regionDir = new Path(snapshotDir, regionName);
HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
regionDir);
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) {
regions.add(hri);
}
}
// sort for regions according to startKey.
Collections.sort(regions);
initScanMetrics(scan);
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs,
rootDir, restoreDir, snapshotName);
}
@Override
public Result next() throws IOException {
Result result = null;
while (true) {
if (currentRegionScanner == null) {
currentRegion++;
if (currentRegion >= regions.size()) {
return null;
}
HRegionInfo hri = regions.get(currentRegion);
currentRegionScanner = new ClientSideRegionScanner(conf, fs,
restoreDir, htd, hri, scan, scanMetrics);
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
}
result = currentRegionScanner.next();
if (result != null) {
return result;
} else {
currentRegionScanner.close();
currentRegionScanner = null;
}
}
}
@Override
public void close() {
if (currentRegionScanner != null) {
currentRegionScanner.close();
}
try {
fs.delete(this.restoreDir, true);
} catch (IOException ex) {
LOG.warn("Could not delete restore directory for the snapshot:" + ex);
}
}
}

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.cliffc.high_scale_lib.Counter;
import com.google.protobuf.InvalidProtocolBufferException;
@ -122,6 +123,32 @@ public class TableMapReduceUtil {
job, true);
}
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
throws IOException {
initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, true, inputFormatClass);
}
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
@ -135,13 +162,16 @@ public class TableMapReduceUtil {
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @param initCredentials whether to initialize hbase auth credentials for the job
* @param inputFormatClass the input format
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
boolean addDependencyJars, boolean initCredentials,
Class<? extends InputFormat> inputFormatClass)
throws IOException {
job.setInputFormatClass(inputFormatClass);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
@ -160,7 +190,9 @@ public class TableMapReduceUtil {
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
if (initCredentials) {
initCredentials(job);
}
}
/**
@ -239,6 +271,40 @@ public class TableMapReduceUtil {
outputValueClass, job, addDependencyJars, TableInputFormat.class);
}
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* @throws IOException When setting up the details fails.
* @see TableSnapshotInputFormat
*/
public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Path tmpRestoreDir)
throws IOException {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
// We would need even more libraries that hbase-server depends on
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
}
/**
* Use this before submitting a Multi TableMap job. It will appropriately set
* up the job.

View File

@ -35,11 +35,9 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.util.StringUtils;
/**
@ -97,7 +95,7 @@ public class TableRecordReaderImpl {
* @return The getCounter method or null if not available.
* @throws IOException
*/
private Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
throws IOException {
Method m = null;
try {
@ -253,11 +251,6 @@ public class TableRecordReaderImpl {
* @throws IOException
*/
private void updateCounters() throws IOException {
// we can get access to counters only if hbase uses new mapreduce APIs
if (this.getCounter == null) {
return;
}
byte[] serializedMetrics = currentScan.getAttribute(
Scan.SCAN_ATTRIBUTES_METRICS_DATA);
if (serializedMetrics == null || serializedMetrics.length == 0 ) {
@ -266,16 +259,25 @@ public class TableRecordReaderImpl {
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
updateCounters(scanMetrics, numRestarts, getCounter, context);
}
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
return;
}
try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)this.getCounter.invoke(context,
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}

View File

@ -0,0 +1,406 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
/**
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
* bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
* hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
* restored to the live cluster or cloned. This also allows to run the mapreduce job from an
* online or offline hbase cluster. The snapshot files can be exported by using the
* {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
* run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
* while there are jobs reading from snapshot files.
* <p>
* Usage is similar to TableInputFormat, and
* {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
* can be used to configure the job.
* <pre>{@code
* Job job = new Job(conf);
* Scan scan = new Scan();
* TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
* scan, MyTableMapper.class, MyMapKeyOutput.class,
* MyMapOutputValueWritable.class, job, true);
* }
* </pre>
* <p>
* Internally, this input format restores the snapshot into the given tmp directory. Similar to
* {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
* from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
* from the user.
* <p>
* HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
* snapshot files and data files. HBase also enforces security because all the requests are handled
* by the server layer, and the user cannot read from the data files directly. To read from snapshot
* files directly from the file system, the user who is running the MR job must have sufficient
* permissions to access snapshot and reference files. This means that to run mapreduce over
* snapshot files, the MR job has to be run as the HBase user or the user must have group or other
* priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
* snapshot/data files will completely circumvent the access control enforced by HBase.
* @see TableSnapshotScanner
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
// TODO: Snapshots files are owned in fs by the hbase user. There is no
// easy way to delegate access.
private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
private String regionName;
private String[] locations;
// constructor for mapreduce framework / Writable
public TableSnapshotRegionSplit() { }
TableSnapshotRegionSplit(String regionName, List<String> locations) {
this.regionName = regionName;
if (locations == null || locations.isEmpty()) {
this.locations = new String[0];
} else {
this.locations = locations.toArray(new String[locations.size()]);
}
}
@Override
public long getLength() throws IOException, InterruptedException {
//TODO: We can obtain the file sizes of the snapshot here.
return 0;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return locations;
}
// TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
// doing this wrapping with Writables.
@Override
public void write(DataOutput out) throws IOException {
MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
.setRegion(RegionSpecifier.newBuilder()
.setType(RegionSpecifierType.ENCODED_REGION_NAME)
.setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build());
for (String location : locations) {
builder.addLocations(location);
}
MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
split.writeTo(baos);
baos.close();
byte[] buf = baos.toByteArray();
out.writeInt(buf.length);
out.write(buf);
}
@Override
public void readFields(DataInput in) throws IOException {
int len = in.readInt();
byte[] buf = new byte[len];
in.readFully(buf);
MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
List<String> locationsList = split.getLocationsList();
this.locations = locationsList.toArray(new String[locationsList.size()]);
}
}
@VisibleForTesting
class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
private TableSnapshotRegionSplit split;
private Scan scan;
private Result result = null;
private ImmutableBytesWritable row = null;
private ClientSideRegionScanner scanner;
private TaskAttemptContext context;
private Method getCounter;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
this.split = (TableSnapshotRegionSplit) split;
String regionName = this.split.regionName;
String snapshotName = getSnapshotName(conf);
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
// directory where snapshot was restored
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
//load table descriptor
HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
//load region descriptor
Path regionDir = new Path(snapshotDir, regionName);
HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
// create scan
String scanStr = conf.get(TableInputFormat.SCAN);
if (scanStr == null) {
throw new IllegalArgumentException("A Scan is not configured for this job");
}
scan = TableMapReduceUtil.convertStringToScan(scanStr);
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine,
// otherwise we have to set the thread read point
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
if (context != null) {
this.context = context;
getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
result = scanner.next();
if (result == null) {
//we are done
return false;
}
if (this.row == null) {
this.row = new ImmutableBytesWritable();
}
this.row.set(result.getRow());
ScanMetrics scanMetrics = scanner.getScanMetrics();
if (scanMetrics != null && context != null) {
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
}
return true;
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return row;
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return result;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0; // TODO: use total bytes to estimate
}
@Override
public void close() throws IOException {
if (this.scanner != null) {
this.scanner.close();
}
}
}
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new TableSnapshotRegionRecordReader();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String snapshotName = getSnapshotName(conf);
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
Set<String> snapshotRegionNames
= SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
if (snapshotRegionNames == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
// load table descriptor
HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
snapshotDir);
Scan scan = TableMapReduceUtil.convertStringToScan(conf
.get(TableInputFormat.SCAN));
Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
List<InputSplit> splits = new ArrayList<InputSplit>();
for (String regionName : snapshotRegionNames) {
// load region descriptor
Path regionDir = new Path(snapshotDir, regionName);
HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
regionDir);
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List<String> hosts = getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
splits.add(new TableSnapshotRegionSplit(regionName, hosts));
}
}
return splits;
}
/**
* This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
* weights into account, thus will treat every location passed from the input split as equal. We
* do not want to blindly pass all the locations, since we are creating one split per region, and
* the region's blocks are all distributed throughout the cluster unless favorite node assignment
* is used. On the expected stable case, only one location will contain most of the blocks as local.
* On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
* we are doing a simple heuristic, where we will pass all hosts which have at least 80%
* (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
* host with the best locality.
*/
@VisibleForTesting
List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
List<String> locations = new ArrayList<String>(3);
HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
if (hostAndWeights.length == 0) {
return locations;
}
HostAndWeight topHost = hostAndWeights[0];
locations.add(topHost.getHost());
// Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
double cutoffMultiplier
= conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
double filterWeight = topHost.getWeight() * cutoffMultiplier;
for (int i = 1; i < hostAndWeights.length; i++) {
if (hostAndWeights[i].getWeight() >= filterWeight) {
locations.add(hostAndWeights[i].getHost());
} else {
break;
}
}
return locations;
}
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param job the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @throws IOException if an error occurs
*/
public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
Configuration conf = job.getConfiguration();
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
// TODO: restore from record readers to parallelize.
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
conf.set(TABLE_DIR_KEY, restoreDir.toString());
}
private static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) {
throw new IllegalArgumentException("Snapshot name must be provided");
}
return snapshotName;
}
}

View File

@ -759,8 +759,23 @@ public class HRegion implements HeapSize { // , Writable{
*/
public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
}
/**
* This is a helper function to compute HDFS block distribution on demand
* @param conf configuration
* @param tableDescriptor HTableDescriptor of the table
* @param regionInfo encoded name of the region
* @param tablePath the table directory
* @return The HDFS blocks distribution for the given region.
* @throws IOException
*/
public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
throws IOException {
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
FileSystem fs = tablePath.getFileSystem(conf);
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
@ -4014,11 +4029,36 @@ public class HRegion implements HeapSize { // , Writable{
final HLog hlog,
final boolean initialize, final boolean ignoreHLog)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
}
/**
* Convenience method creating new HRegions. Used by createTable.
* The {@link HLog} for the created region needs to be closed
* explicitly, if it is not null.
* Use {@link HRegion#getLog()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
* @param conf
* @param hTableDescriptor
* @param hlog shared HLog
* @param initialize - true to initialize the region
* @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
final HLog hlog,
final boolean initialize, final boolean ignoreHLog)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = FileSystem.get(conf);
HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
HLog effectiveHLog = hlog;
@ -4176,15 +4216,39 @@ public class HRegion implements HeapSize { // , Writable{
final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
}
/**
* Open a Region.
* @param conf The Configuration object to use.
* @param fs Filesystem to use
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
* @param wal HLog for region to use. This method will call
* HLog#setSequenceNumber(long) passing the result of the call to
* HRegion#getMinSequenceId() to ensure the log id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("Opening region: " + info);
}
Path dir = FSUtils.getTableDir(rootDir, info.getTable());
HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
return r.openHRegion(reporter);
}
/**
* Useful when reopening a closed region (normally for unit tests)
* @param other original object

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.snapshot;
import java.io.InputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
@ -37,23 +37,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@ -471,8 +472,9 @@ public class RestoreSnapshotHelper {
}
// create the regions on disk
ModifyRegionUtils.createRegions(conf, rootDir,
ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
@Override
public void fillRegion(final HRegion region) throws IOException {
cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
}
@ -499,6 +501,7 @@ public class RestoreSnapshotHelper {
final String tableName = tableDesc.getTableName().getNameAsString();
SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
new FSVisitor.StoreFileVisitor() {
@Override
public void storeFile (final String region, final String family, final String hfile)
throws IOException {
LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
@ -627,10 +630,13 @@ public class RestoreSnapshotHelper {
private void restoreWALs() throws IOException {
final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
snapshotTable, regionsMap);
// TODO: use executors to parallelize splitting
// TODO: once split, we do not need to split again for other restores
try {
// Recover.Edits
SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
new FSVisitor.RecoveredEditsVisitor() {
@Override
public void recoveredEdits (final String region, final String logfile) throws IOException {
Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
logSplitter.splitRecoveredEdit(path);
@ -639,6 +645,7 @@ public class RestoreSnapshotHelper {
// Region Server Logs
SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
@Override
public void logFile (final String server, final String logfile) throws IOException {
logSplitter.splitLog(server, logfile);
}
@ -689,4 +696,45 @@ public class RestoreSnapshotHelper {
}
return htd;
}
/**
* Copy the snapshot files for a snapshot scanner, discards meta changes.
* @param conf
* @param fs
* @param rootDir
* @param restoreDir
* @param snapshotName
* @throws IOException
*/
public static void copySnapshotForScanner(Configuration conf, FileSystem fs, Path rootDir,
Path restoreDir, String snapshotName) throws IOException {
// ensure that restore dir is not under root dir
if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " +
"should be the same");
}
if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +
"root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
}
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
//load table descriptor
HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
MonitoredTask status = TaskMonitor.get().createStatus(
"Restoring snapshot '" + snapshotName + "' to directory " + restoreDir);
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc,
snapshotDir, htd, restoreDir, monitor, status);
helper.restoreHdfsRegions(); // TODO: parallelize.
if (LOG.isDebugEnabled()) {
LOG.debug("Restored table dir:" + restoreDir);
FSUtils.logFileSystemState(fs, restoreDir, LOG);
}
}
}

View File

@ -54,7 +54,7 @@ public abstract class AbstractHBaseTool implements Tool {
protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>();
protected String[] cmdLineArgs = null;
/**
@ -151,6 +151,11 @@ public abstract class AbstractHBaseTool implements Tool {
addOptWithArg(opt, description);
}
protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) {
requiredOptions.add(longOpt);
addOptWithArg(shortOpt, longOpt, description);
}
protected void addOptNoArg(String opt, String description) {
options.addOption(opt, false, description);
}

View File

@ -84,6 +84,26 @@ public abstract class ModifyRegionUtils {
public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final RegionFillTask task) throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
}
/**
* Create new set of regions on the specified file-system.
* NOTE: that you should add the regions to hbase:meta after this operation.
*
* @param conf {@link Configuration}
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
* @param hTableDescriptor description of the table
* @param newRegions {@link HRegionInfo} that describes the regions to create
* @param task {@link RegionFillTask} custom code to populate region after creation
* @throws IOException
*/
public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final RegionFillTask task) throws IOException {
if (newRegions == null) return null;
int regionNumber = newRegions.length;
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
@ -93,26 +113,14 @@ public abstract class ModifyRegionUtils {
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable<HRegionInfo>() {
@Override
public HRegionInfo call() throws IOException {
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
rootDir, conf, hTableDescriptor, null,
false, true);
try {
// 2. Custom user code to interact with the created region
if (task != null) {
task.fillRegion(region);
}
} finally {
// 3. Close the new region to flush to disk. Close log file too.
region.close();
}
return region.getRegionInfo();
return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
}
});
}
try {
// 4. wait for all regions to finish creation
// wait for all regions to finish creation
for (int i = 0; i < regionNumber; i++) {
Future<HRegionInfo> future = completionService.take();
HRegionInfo regionInfo = future.get();
@ -129,6 +137,35 @@ public abstract class ModifyRegionUtils {
return regionInfos;
}
/**
* Create new set of regions on the specified file-system.
* @param conf {@link Configuration}
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
* @param hTableDescriptor description of the table
* @param newRegion {@link HRegionInfo} that describes the region to create
* @param task {@link RegionFillTask} custom code to populate region after creation
* @throws IOException
*/
public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
final RegionFillTask task) throws IOException {
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
rootDir, tableDir, conf, hTableDescriptor, null,
false, true);
try {
// 2. Custom user code to interact with the created region
if (task != null) {
task.fillRegion(region);
}
} finally {
// 3. Close the new region to flush to disk. Close log file too.
region.close();
}
return region.getRegionInfo();
}
/*
* used by createRegions() to get the thread pool executor based on the
* "hbase.hregion.open.and.init.threads.max" property.
@ -142,6 +179,7 @@ public abstract class ModifyRegionUtils {
new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;

View File

@ -162,6 +162,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* mini dfs.
* @deprecated can be used only with mini dfs
*/
@Deprecated
private static final String TEST_DIRECTORY_KEY = "test.build.data";
/** Filesystem URI used for map-reduce mini-cluster setup */
@ -1625,24 +1626,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @throws IOException
*/
public int loadTable(final HTable t, final byte[] f) throws IOException {
t.setAutoFlush(false, true);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
Put put = new Put(k);
put.add(f, null, k);
t.put(put);
rowCount++;
}
}
}
t.flushCommits();
return rowCount;
return loadTable(t, new byte[][] {f});
}
/**
@ -1653,28 +1637,83 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @throws IOException
*/
public int loadTable(final HTable t, final byte[][] f) throws IOException {
t.setAutoFlush(false, true);
byte[] k = new byte[3];
return loadTable(t, f, null);
}
/**
* Load table of multiple column families with rows from 'aaa' to 'zzz'.
* @param t Table
* @param f Array of Families to load
* @param value the values of the cells. If null is passed, the row key is used as value
* @return Count of rows loaded.
* @throws IOException
*/
public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
t.setAutoFlush(false);
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
Put put = new Put(k);
for (int i = 0; i < f.length; i++) {
put.add(f[i], null, k);
}
t.put(put);
rowCount++;
}
for (byte[] row : HBaseTestingUtility.ROWS) {
Put put = new Put(row);
for (int i = 0; i < f.length; i++) {
put.add(f[i], null, value != null ? value : row);
}
t.put(put);
rowCount++;
}
t.flushCommits();
return rowCount;
}
/** A tracker for tracking and validating table rows
* generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
*/
public static class SeenRowTracker {
int dim = 'z' - 'a' + 1;
int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
byte[] startRow;
byte[] stopRow;
public SeenRowTracker(byte[] startRow, byte[] stopRow) {
this.startRow = startRow;
this.stopRow = stopRow;
}
void reset() {
for (byte[] row : ROWS) {
seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
}
}
int i(byte b) {
return b - 'a';
}
public void addRow(byte[] row) {
seenRows[i(row[0])][i(row[1])][i(row[2])]++;
}
/** Validate that all the rows between startRow and stopRow are seen exactly once, and
* all other rows none
*/
public void validate() {
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
int count = seenRows[i(b1)][i(b2)][i(b3)];
int expectedCount = 0;
if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
&& Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
expectedCount = 1;
}
if (count != expectedCount) {
String row = new String(new byte[] {b1,b2,b3});
throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
}
}
}
}
}
}
public int loadRegion(final HRegion r, final byte[] f) throws IOException {
return loadRegion(r, f, false);
}
@ -1786,6 +1825,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return createMultiRegions(getConfiguration(), table, columnFamily);
}
/** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
static {
int i = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
ROWS[i][0] = b1;
ROWS[i][1] = b2;
ROWS[i][2] = b3;
i++;
}
}
}
}
public static final byte[][] KEYS = {
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
@ -3218,6 +3273,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
};
}
/**
* Returns a {@link Predicate} for checking that table is enabled
*/
public Waiter.Predicate<Exception> predicateTableEnabled(final TableName tableName) {
return new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getHBaseAdmin().isTableEnabled(tableName);
}
};
}
/**
* Create a set of column descriptors with the combination of compression,
* encoding, bloom codecs available.

View File

@ -0,0 +1,393 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Stopwatch;
/**
* A simple performance evaluation tool for single client and MR scans
* and snapshot scans.
*/
public class ScanPerformanceEvaluation extends AbstractHBaseTool {
private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
private String type;
private String file;
private String tablename;
private String snapshotName;
private String restoreDir;
private String caching;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
Path rootDir;
try {
rootDir = FSUtils.getRootDir(conf);
rootDir.getFileSystem(conf);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
protected void addOptions() {
this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
this.addOptWithArg("f", "file", "the filename to read from");
this.addOptWithArg("tn", "table", "the tablename to read from");
this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
this.addOptWithArg("ch", "caching", "scanner caching value");
}
@Override
protected void processOptions(CommandLine cmd) {
type = cmd.getOptionValue("type");
file = cmd.getOptionValue("file");
tablename = cmd.getOptionValue("table");
snapshotName = cmd.getOptionValue("snapshot");
restoreDir = cmd.getOptionValue("restoredir");
caching = cmd.getOptionValue("caching");
}
protected void testHdfsStreaming(Path filename) throws IOException {
byte[] buf = new byte[1024];
FileSystem fs = filename.getFileSystem(getConf());
// read the file from start to finish
Stopwatch fileOpenTimer = new Stopwatch();
Stopwatch streamTimer = new Stopwatch();
fileOpenTimer.start();
FSDataInputStream in = fs.open(filename);
fileOpenTimer.stop();
long totalBytes = 0;
streamTimer.start();
while (true) {
int read = in.read(buf);
if (read < 0) {
break;
}
totalBytes += read;
}
streamTimer.stop();
double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
System.out.println("HDFS streaming: ");
System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
}
private Scan getScan() {
Scan scan = new Scan(); // default scan settings
scan.setCacheBlocks(false);
scan.setMaxVersions(1);
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
if (caching != null) {
scan.setCaching(Integer.parseInt(caching));
}
return scan;
}
public void testScan() throws IOException {
Stopwatch tableOpenTimer = new Stopwatch();
Stopwatch scanOpenTimer = new Stopwatch();
Stopwatch scanTimer = new Stopwatch();
tableOpenTimer.start();
HTable table = new HTable(getConf(), TableName.valueOf(tablename));
tableOpenTimer.stop();
Scan scan = getScan();
scanOpenTimer.start();
ResultScanner scanner = table.getScanner(scan);
scanOpenTimer.stop();
long numRows = 0;
long numCells = 0;
scanTimer.start();
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
numRows++;
numCells += result.rawCells().length;
}
scanTimer.stop();
scanner.close();
table.close();
ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
long totalBytes = metrics.countOfBytesInResults.get();
double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
System.out.println("HBase scan: ");
System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
public void testSnapshotScan() throws IOException {
Stopwatch snapshotRestoreTimer = new Stopwatch();
Stopwatch scanOpenTimer = new Stopwatch();
Stopwatch scanTimer = new Stopwatch();
Path restoreDir = new Path(this.restoreDir);
snapshotRestoreTimer.start();
restoreDir.getFileSystem(conf).delete(restoreDir, true);
snapshotRestoreTimer.stop();
Scan scan = getScan();
scanOpenTimer.start();
TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
scanOpenTimer.stop();
long numRows = 0;
long numCells = 0;
scanTimer.start();
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
numRows++;
numCells += result.rawCells().length;
}
scanTimer.stop();
scanner.close();
ScanMetrics metrics = scanner.getScanMetrics();
long totalBytes = metrics.countOfBytesInResults.get();
double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
System.out.println("HBase scan snapshot: ");
System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
public static enum ScanCounter {
NUM_ROWS,
NUM_CELLS,
}
public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException,
InterruptedException {
context.getCounter(ScanCounter.NUM_ROWS).increment(1);
context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
}
}
public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
Stopwatch scanOpenTimer = new Stopwatch();
Stopwatch scanTimer = new Stopwatch();
Scan scan = getScan();
String jobName = "testScanMapReduce";
Job job = new Job(conf);
job.setJobName(jobName);
job.setJarByClass(getClass());
TableMapReduceUtil.initTableMapperJob(
this.tablename,
scan,
MyMapper.class,
NullWritable.class,
NullWritable.class,
job
);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
scanTimer.start();
job.waitForCompletion(true);
scanTimer.stop();
Counters counters = job.getCounters();
long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
System.out.println("HBase scan mapreduce: ");
System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
Stopwatch scanOpenTimer = new Stopwatch();
Stopwatch scanTimer = new Stopwatch();
Scan scan = getScan();
String jobName = "testSnapshotScanMapReduce";
Job job = new Job(conf);
job.setJobName(jobName);
job.setJarByClass(getClass());
TableMapReduceUtil.initTableSnapshotMapperJob(
this.snapshotName,
scan,
MyMapper.class,
NullWritable.class,
NullWritable.class,
job,
true,
new Path(restoreDir)
);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
scanTimer.start();
job.waitForCompletion(true);
scanTimer.stop();
Counters counters = job.getCounters();
long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
System.out.println("HBase scan mapreduce: ");
System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
@Override
protected int doWork() throws Exception {
if (type.equals("streaming")) {
testHdfsStreaming(new Path(file));
} else if (type.equals("scan")){
testScan();
} else if (type.equals("snapshotscan")) {
testSnapshotScan();
} else if (type.equals("scanmapreduce")) {
testScanMapReduce();
} else if (type.equals("snapshotscanmapreduce")) {
testSnapshotScanMapReduce();
}
return 0;
}
public static void main (String[] args) throws Exception {
int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
System.exit(ret);
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestTableSnapshotScanner {
private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final int NUM_REGION_SERVERS = 2;
private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
public static byte[] bbb = Bytes.toBytes("bbb");
public static byte[] yyy = Bytes.toBytes("yyy");
private FileSystem fs;
private Path rootDir;
public void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(NUM_REGION_SERVERS);
rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
fs = rootDir.getFileSystem(UTIL.getConfiguration());
}
public void tearDownCluster() throws Exception {
UTIL.shutdownMiniCluster();
}
private static void setupConf(Configuration conf) {
// Enable snapshot
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
}
@After
public void tearDown() throws Exception {
}
public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
String snapshotName, int numRegions)
throws Exception {
try {
util.deleteTable(tableName);
} catch(Exception ex) {
// ignore
}
if (numRegions > 1) {
util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
} else {
util.createTable(tableName, FAMILIES);
}
HBaseAdmin admin = util.getHBaseAdmin();
// put some stuff in the table
HTable table = new HTable(util.getConfiguration(), tableName);
util.loadTable(table, FAMILIES);
Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
// load different values
byte[] value = Bytes.toBytes("after_snapshot_value");
util.loadTable(table, FAMILIES, value);
// cause flush to create new files in the region
admin.flush(tableName.toString());
table.close();
}
@Test
public void testWithSingleRegion() throws Exception {
testScanner(UTIL, "testWithSingleRegion", 1, false);
}
@Test
public void testWithMultiRegion() throws Exception {
testScanner(UTIL, "testWithMultiRegion", 10, false);
}
@Test
public void testWithOfflineHBaseMultiRegion() throws Exception {
testScanner(UTIL, "testWithMultiRegion", 20, true);
}
private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, boolean shutdownCluster)
throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testScanner");
try {
createTableAndSnapshot(util, tableName, snapshotName, numRegions);
if (shutdownCluster) {
util.shutdownMiniHBaseCluster();
}
Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(bbb, yyy); // limit the scan
TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
verifyScanner(scanner, bbb, yyy);
scanner.close();
} finally {
if (!shutdownCluster) {
util.getHBaseAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
tearDownCluster();
}
}
}
private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
throws IOException, InterruptedException {
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
verifyRow(result);
rowTracker.addRow(result.getRow());
}
// validate all rows are seen
rowTracker.validate();
}
private static void verifyRow(Result result) throws IOException {
byte[] row = result.getRow();
CellScanner scanner = result.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
//assert that all Cells in the Result have the same key
Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
}
}

View File

@ -0,0 +1,354 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(LargeTests.class)
public class TestTableSnapshotInputFormat {
private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final int NUM_REGION_SERVERS = 2;
private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
public static byte[] bbb = Bytes.toBytes("bbb");
public static byte[] yyy = Bytes.toBytes("yyy");
private FileSystem fs;
private Path rootDir;
public void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(NUM_REGION_SERVERS);
rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
fs = rootDir.getFileSystem(UTIL.getConfiguration());
}
public void tearDownCluster() throws Exception {
UTIL.shutdownMiniCluster();
}
private static void setupConf(Configuration conf) {
// Enable snapshot
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testGetBestLocations() throws IOException {
TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
Configuration conf = UTIL.getConfiguration();
HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution = new HDFSBlocksDistribution();
blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
Assert.assertEquals(Lists.newArrayList("h1", "h2"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
Assert.assertEquals(Lists.newArrayList("h2", "h1"), tsif.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), tsif.getBestLocations(conf, blockDistribution));
}
public static enum TestTableSnapshotCounters {
VALIDATION_ERROR
}
public static class TestTableSnapshotMapper
extends TableMapper<ImmutableBytesWritable, NullWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
// Validate a single row coming from the snapshot, and emit the row key
verifyRowFromMap(key, value);
context.write(key, NullWritable.get());
}
}
public static class TestTableSnapshotReducer
extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
rowTracker.addRow(key.get());
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
rowTracker.validate();
}
}
public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
String snapshotName, int numRegions)
throws Exception {
try {
util.deleteTable(tableName);
} catch(Exception ex) {
// ignore
}
if (numRegions > 1) {
util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
} else {
util.createTable(tableName, FAMILIES);
}
HBaseAdmin admin = util.getHBaseAdmin();
// put some stuff in the table
HTable table = new HTable(util.getConfiguration(), tableName);
util.loadTable(table, FAMILIES);
Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
// load different values
byte[] value = Bytes.toBytes("after_snapshot_value");
util.loadTable(table, FAMILIES, value);
// cause flush to create new files in the region
admin.flush(tableName.toString());
table.close();
}
@Test
public void testWithMockedMapReduceSingleRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
}
@Test
public void testWithMockedMapReduceMultiRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
}
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits)
throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try {
createTableAndSnapshot(util, tableName, snapshotName, numRegions);
Job job = new Job(util.getConfiguration());
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(bbb, yyy); // limit the scan
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
} finally {
util.getHBaseAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
tearDownCluster();
}
}
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
byte[] startRow, byte[] stopRow)
throws IOException, InterruptedException {
TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
List<InputSplit> splits = tsif.getSplits(job);
Assert.assertEquals(expectedNumSplits, splits.size());
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
for (int i = 0; i < splits.size(); i++) {
// validate input split
InputSplit split = splits.get(i);
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
// validate record reader
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext);
rr.initialize(split, taskAttemptContext);
// validate we can read all the data back
while (rr.nextKeyValue()) {
byte[] row = rr.getCurrentKey().get();
verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
rowTracker.addRow(row);
}
rr.close();
}
// validate all rows are seen
rowTracker.validate();
}
public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException {
byte[] row = key.get();
CellScanner scanner = result.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
//assert that all Cells in the Result have the same key
Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
}
@Test
public void testWithMapReduceSingleRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
}
@Test
public void testWithMapReduceMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
}
@Test
// run the MR job while HBase is offline
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
}
private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
setupCluster();
util.startMiniMapReduceCluster();
try {
Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
TableName tableName = TableName.valueOf("testWithMapReduce");
doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
expectedNumSplits, shutdownCluster);
} finally {
util.shutdownMiniMapReduceCluster();
tearDownCluster();
}
}
// this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster)
throws Exception {
//create the table and snapshot
createTableAndSnapshot(util, tableName, snapshotName, numRegions);
if (shutdownCluster) {
util.shutdownMiniHBaseCluster();
}
try {
// create the job
Job job = new Job(util.getConfiguration());
Scan scan = new Scan(bbb, yyy); // limit the scan
job.setJarByClass(util.getClass());
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, true, tableDir);
job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
Assert.assertTrue(job.waitForCompletion(true));
} finally {
if (!shutdownCluster) {
util.getHBaseAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
}
}
}
}

View File

@ -664,7 +664,7 @@ public class TestRegionPlacement {
/**
* Create a table with specified table name and region number.
* @param table
* @param tablename
* @param regionNum
* @return
* @throws IOException