HBASE-12783 Create efficient RegionLocator implementation (Solomon Duskis)

This commit is contained in:
tedyu 2015-01-02 19:48:06 -08:00
parent 820f629423
commit ac95cc1fbb
24 changed files with 779 additions and 590 deletions

View File

@ -32,8 +32,6 @@ import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import javax.annotation.Nonnull;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -1000,7 +998,7 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* 1 if there is a mismatch in the contents * 1 if there is a mismatch in the contents
*/ */
@Override @Override
public int compareTo(@Nonnull final HTableDescriptor other) { public int compareTo(final HTableDescriptor other) {
int result = this.name.compareTo(other.name); int result = this.name.compareTo(other.name);
if (result == 0) { if (result == 0) {
result = families.size() - other.families.size(); result = families.size() - other.families.size();

View File

@ -718,11 +718,7 @@ class ConnectionManager {
@Override @Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException { public RegionLocator getRegionLocator(TableName tableName) throws IOException {
if (managed) { return new HRegionLocator(tableName, this);
throw new IOException("The connection has to be unmanaged.");
}
return new HTable(
tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, getBatchPool());
} }
@Override @Override

View File

@ -0,0 +1,148 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
/**
* An implementation of {@link RegionLocator}. Used to view region location information for a single
* HBase table. Lightweight. Get as needed and just close when done. Instances of this class SHOULD
* NOT be constructed directly. Obtain an instance via {@link Connection}. See
* {@link ConnectionFactory} class comment for an example of how.
*
* <p> This class is thread safe
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class HRegionLocator implements RegionLocator {
private final TableName tableName;
private final ClusterConnection connection;
public HRegionLocator(TableName tableName, ClusterConnection connection) {
this.connection = connection;
this.tableName = tableName;
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
// This method is required by the RegionLocator interface. This implementation does not have any
// persistent state, so there is no need to do anything here.
}
/**
* {@inheritDoc}
*/
@Override
public HRegionLocation getRegionLocation(final byte [] row)
throws IOException {
return connection.getRegionLocation(tableName, row, false);
}
/**
* {@inheritDoc}
*/
@Override
public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
throws IOException {
return connection.getRegionLocation(tableName, row, reload);
}
@Override
public List<HRegionLocation> getAllRegionLocations() throws IOException {
NavigableMap<HRegionInfo, ServerName> locations =
MetaScanner.allTableRegions(this.connection, getName());
ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size());
for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) {
regions.add(new HRegionLocation(entry.getKey(), entry.getValue()));
}
return regions;
}
/**
* {@inheritDoc}
*/
@Override
public byte[][] getStartKeys() throws IOException {
return getStartEndKeys().getFirst();
}
/**
* {@inheritDoc}
*/
@Override
public byte[][] getEndKeys() throws IOException {
return getStartEndKeys().getSecond();
}
/**
* {@inheritDoc}
*/
@Override
public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
return getStartEndKeys(listRegionLocations());
}
@VisibleForTesting
Pair<byte[][], byte[][]> getStartEndKeys(List<RegionLocations> regions) {
final byte[][] startKeyList = new byte[regions.size()][];
final byte[][] endKeyList = new byte[regions.size()][];
for (int i = 0; i < regions.size(); i++) {
HRegionInfo region = regions.get(i).getRegionLocation().getRegionInfo();
startKeyList[i] = region.getStartKey();
endKeyList[i] = region.getEndKey();
}
return new Pair<>(startKeyList, endKeyList);
}
@Override
public TableName getName() {
return this.tableName;
}
@VisibleForTesting
List<RegionLocations> listRegionLocations() throws IOException {
return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
}
public Configuration getConfiguration() {
return connection.getConfiguration();
}
}

View File

@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -46,7 +45,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
@ -108,7 +106,7 @@ import com.google.protobuf.ServiceException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Stable @InterfaceStability.Stable
public class HTable implements HTableInterface, RegionLocator { public class HTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(HTable.class); private static final Log LOG = LogFactory.getLog(HTable.class);
protected ClusterConnection connection; protected ClusterConnection connection;
private final TableName tableName; private final TableName tableName;
@ -125,6 +123,7 @@ public class HTable implements HTableInterface, RegionLocator {
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close() private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG; private Consistency defaultConsistency = Consistency.STRONG;
private HRegionLocator locator;
/** The Async process for puts with autoflush set to false or multiputs */ /** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap; protected AsyncProcess ap;
@ -364,6 +363,7 @@ public class HTable implements HTableInterface, RegionLocator {
// puts need to track errors globally due to how the APIs currently work. // puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess(); multiAp = this.connection.getAsyncProcess();
this.locator = new HRegionLocator(getName(), connection);
} }
/** /**
@ -473,25 +473,25 @@ public class HTable implements HTableInterface, RegionLocator {
@Deprecated @Deprecated
public HRegionLocation getRegionLocation(final String row) public HRegionLocation getRegionLocation(final String row)
throws IOException { throws IOException {
return connection.getRegionLocation(tableName, Bytes.toBytes(row), false); return getRegionLocation(Bytes.toBytes(row), false);
} }
/** /**
* {@inheritDoc} * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
*/ */
@Override @Deprecated
public HRegionLocation getRegionLocation(final byte [] row) public HRegionLocation getRegionLocation(final byte [] row)
throws IOException { throws IOException {
return connection.getRegionLocation(tableName, row, false); return locator.getRegionLocation(row);
} }
/** /**
* {@inheritDoc} * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
*/ */
@Override @Deprecated
public HRegionLocation getRegionLocation(final byte [] row, boolean reload) public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
throws IOException { throws IOException {
return connection.getRegionLocation(tableName, row, reload); return locator.getRegionLocation(row, reload);
} }
/** /**
@ -597,45 +597,27 @@ public class HTable implements HTableInterface, RegionLocator {
} }
/** /**
* {@inheritDoc} * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
*/ */
@Override @Deprecated
public byte [][] getStartKeys() throws IOException { public byte [][] getStartKeys() throws IOException {
return getStartEndKeys().getFirst(); return locator.getStartKeys();
} }
/** /**
* {@inheritDoc} * @deprecated Use {@link RegionLocator#getEndKeys()} instead;
*/ */
@Override @Deprecated
public byte[][] getEndKeys() throws IOException { public byte[][] getEndKeys() throws IOException {
return getStartEndKeys().getSecond(); return locator.getEndKeys();
} }
/** /**
* {@inheritDoc} * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
*/ */
@Override @Deprecated
public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
return locator.getStartEndKeys();
List<RegionLocations> regions = listRegionLocations();
final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
for (RegionLocations locations : regions) {
HRegionInfo region = locations.getRegionLocation().getRegionInfo();
startKeyList.add(region.getStartKey());
endKeyList.add(region.getEndKey());
}
return new Pair<byte [][], byte [][]>(
startKeyList.toArray(new byte[startKeyList.size()][]),
endKeyList.toArray(new byte[endKeyList.size()][]));
}
@VisibleForTesting
List<RegionLocations> listRegionLocations() throws IOException {
return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
} }
/** /**
@ -658,15 +640,12 @@ public class HTable implements HTableInterface, RegionLocator {
* This is mainly useful for the MapReduce integration. * This is mainly useful for the MapReduce integration.
* @return A map of HRegionInfo with it's server address * @return A map of HRegionInfo with it's server address
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
*
* @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
*/ */
@Override @Deprecated
public List<HRegionLocation> getAllRegionLocations() throws IOException { public List<HRegionLocation> getAllRegionLocations() throws IOException {
NavigableMap<HRegionInfo, ServerName> locations = getRegionLocations(); return locator.getAllRegionLocations();
ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size());
for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) {
regions.add(new HRegionLocation(entry.getKey(), entry.getValue()));
}
return regions;
} }
/** /**
@ -1924,4 +1903,8 @@ public class HTable implements HTableInterface, RegionLocator {
callbackErrorServers); callbackErrorServers);
} }
} }
public RegionLocator getRegionLocator() {
return this.locator;
}
} }

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public abstract class RegionServerCallable<T> implements RetryingCallable<T> { public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
// Public because used outside of this package over in ipc. // Public because used outside of this package over in ipc.
static final Log LOG = LogFactory.getLog(RegionServerCallable.class); static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
protected final HConnection connection; protected final Connection connection;
protected final TableName tableName; protected final TableName tableName;
protected final byte[] row; protected final byte[] row;
protected HRegionLocation location; protected HRegionLocation location;
@ -61,7 +61,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
* @param tableName Table name to which <code>row</code> belongs. * @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>. * @param row The row we want in <code>tableName</code>.
*/ */
public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) { public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
this.connection = connection; this.connection = connection;
this.tableName = tableName; this.tableName = tableName;
this.row = row; this.row = row;
@ -75,7 +75,9 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
*/ */
@Override @Override
public void prepare(final boolean reload) throws IOException { public void prepare(final boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload); try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
this.location = regionLocator.getRegionLocation(row, reload);
}
if (this.location == null) { if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName + throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload); ", row=" + Bytes.toString(row) + ", reload=" + reload);
@ -87,7 +89,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
* @return {@link HConnection} instance used by this Callable. * @return {@link HConnection} instance used by this Callable.
*/ */
HConnection getConnection() { HConnection getConnection() {
return this.connection; return (HConnection) this.connection;
} }
protected ClientService.BlockingInterface getStub() { protected ClientService.BlockingInterface getStub() {

View File

@ -27,7 +27,6 @@ import java.rmi.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -126,7 +125,6 @@ public class MetaTableLocator {
* @param zkw zookeeper connection to use * @param zkw zookeeper connection to use
* @return server name or null if we failed to get the data. * @return server name or null if we failed to get the data.
*/ */
@Nullable
public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) { public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) {
try { try {
RegionState state = getMetaRegionState(zkw); RegionState state = getMetaRegionState(zkw);

View File

@ -18,17 +18,10 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput; import static org.junit.Assert.assertEquals;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,20 +35,23 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.RegionSplitter;
@ -79,7 +75,15 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Test Bulk Load and MR on a distributed cluster. * Test Bulk Load and MR on a distributed cluster.
@ -250,7 +254,6 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
EnvironmentEdgeManager.currentTime(); EnvironmentEdgeManager.currentTime();
Configuration conf = new Configuration(util.getConfiguration()); Configuration conf = new Configuration(util.getConfiguration());
Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
HTable table = new HTable(conf, getTablename());
conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false);
@ -276,9 +279,13 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
// Set where to place the hfiles. // Set where to place the hfiles.
FileOutputFormat.setOutputPath(job, p); FileOutputFormat.setOutputPath(job, p);
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
Table table = conn.getTable(getTablename());
RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
// Configure the partitioner and other things needed for HFileOutputFormat. // Configure the partitioner and other things needed for HFileOutputFormat.
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
// Run the job making sure it works. // Run the job making sure it works.
assertEquals(true, job.waitForCompletion(true)); assertEquals(true, job.waitForCompletion(true));
@ -287,7 +294,8 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
// Load the HFiles in. // Load the HFiles in.
loader.doBulkLoad(p, table); loader.doBulkLoad(p, admin, table, regionLocator);
}
// Delete the files. // Delete the files.
util.getTestFileSystem().delete(p, true); util.getTestFileSystem().delete(p, true);

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -87,7 +88,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
*/ */
public static void configureIncrementalLoad(Job job, HTable table) public static void configureIncrementalLoad(Job job, HTable table)
throws IOException { throws IOException {
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
table.getRegionLocator());
} }
/** /**
@ -150,20 +152,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
HFileOutputFormat2.configurePartitioner(job, splitPoints); HFileOutputFormat2.configurePartitioner(job, splitPoints);
} }
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
*
* @param table to read the properties from
* @param conf to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
static void configureCompression(Table table, Configuration conf) throws IOException { static void configureCompression(Table table, Configuration conf) throws IOException {
HFileOutputFormat2.configureCompression(table, conf); HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
} }
/** /**
@ -177,7 +167,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
*/ */
@VisibleForTesting @VisibleForTesting
static void configureBlockSize(Table table, Configuration conf) throws IOException { static void configureBlockSize(Table table, Configuration conf) throws IOException {
HFileOutputFormat2.configureBlockSize(table, conf); HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
} }
/** /**
@ -191,7 +181,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
*/ */
@VisibleForTesting @VisibleForTesting
static void configureBloomType(Table table, Configuration conf) throws IOException { static void configureBloomType(Table table, Configuration conf) throws IOException {
HFileOutputFormat2.configureBloomType(table, conf); HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
} }
/** /**
@ -206,6 +196,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
@VisibleForTesting @VisibleForTesting
static void configureDataBlockEncoding(Table table, static void configureDataBlockEncoding(Table table,
Configuration conf) throws IOException { Configuration conf) throws IOException {
HFileOutputFormat2.configureDataBlockEncoding(table, conf); HTableDescriptor tableDescriptor = table.getTableDescriptor();
HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
} }
} }

View File

@ -80,7 +80,7 @@ import com.google.common.annotations.VisibleForTesting;
* all HFiles being written. * all HFiles being written.
* <p> * <p>
* Using this class as part of a MapReduce job is best done * Using this class as part of a MapReduce job is best done
* using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}. * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -364,7 +364,7 @@ public class HFileOutputFormat2
@Deprecated @Deprecated
public static void configureIncrementalLoad(Job job, HTable table) public static void configureIncrementalLoad(Job job, HTable table)
throws IOException { throws IOException {
configureIncrementalLoad(job, table, table); configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
} }
/** /**
@ -383,13 +383,32 @@ public class HFileOutputFormat2
*/ */
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
throws IOException { throws IOException {
configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
} }
static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, /**
Class<? extends OutputFormat<?, ?>> cls) throws IOException { * Configure a MapReduce Job to perform an incremental load into the given
Configuration conf = job.getConfiguration(); * table. This
* <ul>
* <li>Inspects the table to configure a total order partitioner</li>
* <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
* <li>Sets the number of reduce tasks to match the current number of regions</li>
* <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
* <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
* PutSortReducer)</li>
* </ul>
* The user should be sure to set the map output value class to either KeyValue or Put before
* running this function.
*/
public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
RegionLocator regionLocator) throws IOException {
configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
UnsupportedEncodingException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class); job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls); job.setOutputFormatClass(cls);
@ -412,7 +431,7 @@ public class HFileOutputFormat2
KeyValueSerialization.class.getName()); KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points. // Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + table.getName()); LOG.info("Looking up current regions for table " + regionLocator.getName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count"); "to match current region count");
@ -420,14 +439,14 @@ public class HFileOutputFormat2
configurePartitioner(job, startKeys); configurePartitioner(job, startKeys);
// Set compression algorithms based on column families // Set compression algorithms based on column families
configureCompression(table, conf); configureCompression(conf, tableDescriptor);
configureBloomType(table, conf); configureBloomType(tableDescriptor, conf);
configureBlockSize(table, conf); configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(table, conf); configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured."); LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
} }
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
@ -438,10 +457,11 @@ public class HFileOutputFormat2
job.setOutputFormatClass(HFileOutputFormat2.class); job.setOutputFormatClass(HFileOutputFormat2.class);
// Set compression algorithms based on column families // Set compression algorithms based on column families
configureCompression(table, conf); configureCompression(conf, table.getTableDescriptor());
configureBloomType(table, conf); configureBloomType(table.getTableDescriptor(), conf);
configureBlockSize(table, conf); configureBlockSize(table.getTableDescriptor(), conf);
configureDataBlockEncoding(table, conf); HTableDescriptor tableDescriptor = table.getTableDescriptor();
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
@ -590,10 +610,9 @@ public class HFileOutputFormat2
@edu.umd.cs.findbugs.annotations.SuppressWarnings( @edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting @VisibleForTesting
static void configureCompression( static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
Table table, Configuration conf) throws IOException { throws UnsupportedEncodingException {
StringBuilder compressionConfigValue = new StringBuilder(); StringBuilder compressionConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if(tableDescriptor == null){ if(tableDescriptor == null){
// could happen with mock table instance // could happen with mock table instance
return; return;
@ -617,17 +636,16 @@ public class HFileOutputFormat2
/** /**
* Serialize column family to block size map to configuration. * Serialize column family to block size map to configuration.
* Invoked while configuring the MR job for incremental load. * Invoked while configuring the MR job for incremental load.
* * @param tableDescriptor to read the properties from
* @param table to read the properties from
* @param conf to persist serialized values into * @param conf to persist serialized values into
*
* @throws IOException * @throws IOException
* on failure to read column family descriptors * on failure to read column family descriptors
*/ */
@VisibleForTesting @VisibleForTesting
static void configureBlockSize( static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
Table table, Configuration conf) throws IOException { throws UnsupportedEncodingException {
StringBuilder blockSizeConfigValue = new StringBuilder(); StringBuilder blockSizeConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) { if (tableDescriptor == null) {
// could happen with mock table instance // could happen with mock table instance
return; return;
@ -651,16 +669,15 @@ public class HFileOutputFormat2
/** /**
* Serialize column family to bloom type map to configuration. * Serialize column family to bloom type map to configuration.
* Invoked while configuring the MR job for incremental load. * Invoked while configuring the MR job for incremental load.
* * @param tableDescriptor to read the properties from
* @param table to read the properties from
* @param conf to persist serialized values into * @param conf to persist serialized values into
*
* @throws IOException * @throws IOException
* on failure to read column family descriptors * on failure to read column family descriptors
*/ */
@VisibleForTesting @VisibleForTesting
static void configureBloomType( static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
Table table, Configuration conf) throws IOException { throws UnsupportedEncodingException {
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) { if (tableDescriptor == null) {
// could happen with mock table instance // could happen with mock table instance
return; return;
@ -694,9 +711,8 @@ public class HFileOutputFormat2
* on failure to read column family descriptors * on failure to read column family descriptors
*/ */
@VisibleForTesting @VisibleForTesting
static void configureDataBlockEncoding(Table table, static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
Configuration conf) throws IOException { Configuration conf) throws UnsupportedEncodingException {
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) { if (tableDescriptor == null) {
// could happen with mock table instance // could happen with mock table instance
return; return;

View File

@ -18,20 +18,8 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -42,13 +30,18 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -63,6 +56,16 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
/** /**
* Import data written by {@link Export}. * Import data written by {@link Export}.
@ -446,15 +449,18 @@ public class Import extends Configured implements Tool {
if (hfileOutPath != null) { if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class); job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName); try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)){
job.setReducerClass(KeyValueSortReducer.class); job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath); Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class); job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class); com.google.common.base.Preconditions.class);
}
} else { } else {
// No reducers. Just write straight to table. Call initTableReducerJob // No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat. // because it sets up the TableOutputFormat.

View File

@ -20,17 +20,13 @@ package org.apache.hadoop.hbase.mapreduce;
import static java.lang.String.format; import static java.lang.String.format;
import java.io.File; import com.google.common.base.Preconditions;
import java.io.IOException; import com.google.common.base.Splitter;
import java.util.ArrayList; import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -40,11 +36,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -59,9 +58,11 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions; import java.io.File;
import com.google.common.base.Splitter; import java.io.IOException;
import com.google.common.collect.Lists; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
/** /**
* Tool to import data from a TSV file. * Tool to import data from a TSV file.
@ -496,7 +497,8 @@ public class ImportTsv extends Configured implements Tool {
throw new TableNotFoundException(errorMsg); throw new TableNotFoundException(errorMsg);
} }
} }
try (HTable table = (HTable)connection.getTable(tableName)) { try (Table table = connection.getTable(tableName);
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
// if no.strict is false then check column family // if no.strict is false then check column family
if(!noStrict) { if(!noStrict) {
@ -534,7 +536,8 @@ public class ImportTsv extends Configured implements Tool {
job.setMapOutputValueClass(Put.class); job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class); job.setCombinerClass(PutCombiner.class);
} }
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
regionLocator);
} }
} else { } else {
if (!admin.tableExists(tableName)) { if (!admin.tableExists(tableName)) {

View File

@ -20,42 +20,21 @@ package org.apache.hadoop.hbase.mapreduce;
import static java.lang.String.format; import static java.lang.String.format;
import java.io.FileNotFoundException; import com.google.common.collect.HashMultimap;
import java.io.IOException; import com.google.common.collect.Multimap;
import java.io.InterruptedIOException; import com.google.common.collect.Multimaps;
import java.nio.ByteBuffer; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -64,10 +43,14 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -95,12 +78,30 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.HashMultimap; import java.io.FileNotFoundException;
import com.google.common.collect.Multimap; import java.io.IOException;
import com.google.common.collect.Multimaps; import java.io.InterruptedIOException;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* Tool to load the output of HFileOutputFormat into an existing table. * Tool to load the output of HFileOutputFormat into an existing table.
@ -235,12 +236,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void doBulkLoad(Path hfofDir, final HTable table) public void doBulkLoad(Path hfofDir, final HTable table)
throws TableNotFoundException, IOException throws TableNotFoundException, IOException
{ {
final HConnection conn = table.getConnection(); doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator());
}
if (!conn.isTableAvailable(table.getName())) { /**
throw new TableNotFoundException("Table " + * Perform a bulk load of the given directory into the given
Bytes.toStringBinary(table.getTableName()) + * pre-existing table. This method is not threadsafe.
"is not currently available."); *
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
*/
@SuppressWarnings("deprecation")
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
} }
// initialize thread pools // initialize thread pools
@ -276,7 +289,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String msg = String msg =
"Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+ unmatchedFamilies + "; valid family names of table " + unmatchedFamilies + "; valid family names of table "
+ Bytes.toString(table.getTableName()) + " are: " + familyNames; + table.getName() + " are: " + familyNames;
LOG.error(msg); LOG.error(msg);
throw new IOException(msg); throw new IOException(msg);
} }
@ -300,7 +313,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// Assumes that region splits can happen while this occurs. // Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
// need to reload split keys each iteration. // need to reload split keys each iteration.
final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys(); final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
if (count != 0) { if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " + LOG.info("Split occured while grouping HFiles, retry attempt " +
+ count + " with " + queue.size() + " files remaining to group or split"); + count + " with " + queue.size() + " files remaining to group or split");
@ -323,7 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ " hfiles to one family of one region"); + " hfiles to one family of one region");
} }
bulkLoadPhase(table, conn, pool, queue, regionGroups); bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
// NOTE: The next iteration's split / group could happen in parallel to // NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and // atomic bulkloads assuming that there are splits and no merges, and
@ -359,7 +372,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* them. Any failures are re-queued for another pass with the * them. Any failures are re-queued for another pass with the
* groupOrSplitPhase. * groupOrSplitPhase.
*/ */
protected void bulkLoadPhase(final Table table, final HConnection conn, protected void bulkLoadPhase(final Table table, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue, ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
// atomically bulk load the groups. // atomically bulk load the groups.
@ -431,7 +444,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
* bulk load region targets. * bulk load region targets.
*/ */
private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table, private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
ExecutorService pool, Deque<LoadQueueItem> queue, ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this // <region start key, LQI> need synchronized only within this scope of this
@ -524,7 +537,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @throws IOException * @throws IOException
*/ */
protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable table, final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) final Pair<byte[][], byte[][]> startEndKeys)
throws IOException { throws IOException {
final Path hfilePath = item.hfilePath; final Path hfilePath = item.hfilePath;
@ -569,18 +582,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/ */
if (indexForCallable < 0) { if (indexForCallable < 0) {
throw new IOException("The first region info for table " throw new IOException("The first region info for table "
+ Bytes.toString(table.getTableName()) + table.getName()
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first."); + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
} else if ((indexForCallable == startEndKeys.getFirst().length - 1) } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
&& !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
throw new IOException("The last region info for table " throw new IOException("The last region info for table "
+ Bytes.toString(table.getTableName()) + table.getName()
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first."); + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
} else if (indexForCallable + 1 < startEndKeys.getFirst().length } else if (indexForCallable + 1 < startEndKeys.getFirst().length
&& !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
throw new IOException("The endkey of one region for table " throw new IOException("The endkey of one region for table "
+ Bytes.toString(table.getTableName()) + table.getName()
+ " is not equal to the startkey of the next region in hbase:meta." + " is not equal to the startkey of the next region in hbase:meta."
+ "Please use hbck tool to fix it first."); + "Please use hbck tool to fix it first.");
} }
@ -601,7 +614,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
/** /**
* @deprecated Use {@link #tryAtomicRegionLoad(HConnection, TableName, byte[], Collection)} * @deprecated Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}
*/ */
@Deprecated @Deprecated
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
@ -623,7 +636,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @return empty list if success, list of items to retry on recoverable * @return empty list if success, list of items to retry on recoverable
* failure * failure
*/ */
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
throws IOException { throws IOException {
final List<Pair<byte[], String>> famPaths = final List<Pair<byte[], String>> famPaths =

View File

@ -155,14 +155,10 @@ public abstract class MultiTableInputFormatBase extends
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
TableName tableName = entry.getKey(); TableName tableName = entry.getKey();
List<Scan> scanList = entry.getValue(); List<Scan> scanList = entry.getValue();
Table table = null;
RegionLocator regionLocator = null;
Connection conn = null;
try{ try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
conn = ConnectionFactory.createConnection(context.getConfiguration()); Table table = conn.getTable(tableName);
table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
regionLocator = (RegionLocator) table;
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
regionLocator, conn.getAdmin()); regionLocator, conn.getAdmin());
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
@ -210,10 +206,6 @@ public abstract class MultiTableInputFormatBase extends
} }
} }
} }
} finally {
if (null != table) table.close();
if (null != regionLocator) regionLocator.close();
if (null != conn) conn.close();
} }
} }

View File

@ -549,7 +549,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
@Deprecated @Deprecated
protected void setHTable(HTable table) throws IOException { protected void setHTable(HTable table) throws IOException {
this.table = table; this.table = table;
this.regionLocator = table; this.regionLocator = table.getRegionLocator();
this.admin = table.getConnection().getAdmin(); this.admin = table.getConnection().getAdmin();
} }

View File

@ -17,16 +17,8 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -36,14 +28,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@ -52,6 +49,12 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.TreeMap;
/** /**
* A tool to replay WAL files as a M/R job. * A tool to replay WAL files as a M/R job.
* The WAL can be replayed for a set of tables or all tables, * The WAL can be replayed for a set of tables or all tables,
@ -259,13 +262,17 @@ public class WALPlayer extends Configured implements Tool {
if (tables.length != 1) { if (tables.length != 1) {
throw new IOException("Exactly one table must be specified for the bulk export option"); throw new IOException("Exactly one table must be specified for the bulk export option");
} }
HTable table = new HTable(conf, TableName.valueOf(tables[0])); TableName tableName = TableName.valueOf(tables[0]);
job.setMapperClass(WALKeyValueMapper.class); job.setMapperClass(WALKeyValueMapper.class);
job.setReducerClass(KeyValueSortReducer.class); job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath); Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(KeyValue.class); job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, table); try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
}
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class); com.google.common.base.Preconditions.class);
} else { } else {

View File

@ -68,7 +68,7 @@ public class RegionSizeCalculator {
public RegionSizeCalculator(HTable table) throws IOException { public RegionSizeCalculator(HTable table) throws IOException {
HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
try { try {
init(table, admin); init(table.getRegionLocator(), admin);
} finally { } finally {
admin.close(); admin.close();
} }

View File

@ -2209,7 +2209,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
final byte[] columnFamily, byte [][] startKeys) final byte[] columnFamily, byte [][] startKeys)
throws IOException { throws IOException {
Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
Table meta = new HTable(c, TableName.META_TABLE_NAME); try (Table meta = new HTable(c, TableName.META_TABLE_NAME)) {
HTableDescriptor htd = table.getTableDescriptor(); HTableDescriptor htd = table.getTableDescriptor();
if(!htd.hasFamily(columnFamily)) { if(!htd.hasFamily(columnFamily)) {
HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
@ -2249,17 +2249,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
HConnection conn = table.getConnection(); HConnection conn = table.getConnection();
conn.clearRegionCache(); conn.clearRegionCache();
// assign all the new regions IF table is enabled. // assign all the new regions IF table is enabled.
Admin admin = getHBaseAdmin(); Admin admin = conn.getAdmin();
if (admin.isTableEnabled(table.getName())) { if (admin.isTableEnabled(table.getName())) {
for(HRegionInfo hri : newRegions) { for(HRegionInfo hri : newRegions) {
admin.assign(hri.getRegionName()); admin.assign(hri.getRegionName());
} }
} }
meta.close();
return count; return count;
} }
}
/** /**
* Create rows in hbase:meta for regions of the specified table with the specified * Create rows in hbase:meta for regions of the specified table with the specified
@ -3548,10 +3547,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
} }
public static int getMetaRSPort(Configuration conf) throws IOException { public static int getMetaRSPort(Configuration conf) throws IOException {
RegionLocator table = new HTable(conf, TableName.META_TABLE_NAME); try (Connection c = ConnectionFactory.createConnection();
HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
table.close(); return locator.getRegionLocation(Bytes.toBytes("")).getPort();
return hloc.getPort(); }
} }
/** /**

View File

@ -26,8 +26,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -69,7 +67,7 @@ public class TestRegionRebalancing {
private static final byte[] FAMILY_NAME = Bytes.toBytes("col"); private static final byte[] FAMILY_NAME = Bytes.toBytes("col");
public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class); public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class);
private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private RegionLocator table; private RegionLocator regionLocator;
private HTableDescriptor desc; private HTableDescriptor desc;
private String balancerName; private String balancerName;
@ -101,17 +99,17 @@ public class TestRegionRebalancing {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void testRebalanceOnRegionServerNumberChange() public void testRebalanceOnRegionServerNumberChange()
throws IOException, InterruptedException { throws IOException, InterruptedException {
Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); try(Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Admin admin = connection.getAdmin(); Admin admin = connection.getAdmin()) {
admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS,
1, HBaseTestingUtility.KEYS.length)); 1, HBaseTestingUtility.KEYS.length));
this.table = new HTable(UTIL.getConfiguration(), this.desc.getTableName()); this.regionLocator = connection.getRegionLocator(this.desc.getTableName());
MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection());
assertEquals("Test table should have right number of regions", assertEquals("Test table should have right number of regions",
HBaseTestingUtility.KEYS.length, HBaseTestingUtility.KEYS.length,
this.table.getStartKeys().length); this.regionLocator.getStartKeys().length);
// verify that the region assignments are balanced to start out // verify that the region assignments are balanced to start out
assertRegionsAreBalanced(); assertRegionsAreBalanced();
@ -152,8 +150,8 @@ public class TestRegionRebalancing {
} }
assert(UTIL.getHBaseCluster().getMaster().balance() == true); assert(UTIL.getHBaseCluster().getMaster().balance() == true);
assertRegionsAreBalanced(); assertRegionsAreBalanced();
table.close(); regionLocator.close();
admin.close(); }
} }
/** /**

View File

@ -26,24 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -108,6 +90,22 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* Run tests that use the HBase clients; {@link HTable}. * Run tests that use the HBase clients; {@link HTable}.
* Sets up the HBase mini cluster once at start and runs through all client tests. * Sets up the HBase mini cluster once at start and runs through all client tests.
@ -5204,8 +5202,8 @@ public class TestFromClientSide {
TableName TABLE = TableName.valueOf("testNonCachedGetRegionLocation"); TableName TABLE = TableName.valueOf("testNonCachedGetRegionLocation");
byte [] family1 = Bytes.toBytes("f1"); byte [] family1 = Bytes.toBytes("f1");
byte [] family2 = Bytes.toBytes("f2"); byte [] family2 = Bytes.toBytes("f2");
HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); try (HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10);
Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration())) {
Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
assertEquals(1, regionsMap.size()); assertEquals(1, regionsMap.size());
HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); HRegionInfo regionInfo = regionsMap.keySet().iterator().next();
@ -5239,6 +5237,7 @@ public class TestFromClientSide {
assertTrue(addrAfter.getPort() != addrCache.getPort()); assertTrue(addrAfter.getPort() != addrCache.getPort());
assertEquals(addrAfter.getPort(), addrNoCache.getPort()); assertEquals(addrAfter.getPort(), addrNoCache.getPort());
} }
}
@Test @Test
/** /**
@ -6248,9 +6247,12 @@ public class TestFromClientSide {
HColumnDescriptor fam = new HColumnDescriptor(FAMILY); HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
htd.addFamily(fam); htd.addFamily(fam);
byte[][] KEYS = HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE; byte[][] KEYS = HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE;
TEST_UTIL.getHBaseAdmin().createTable(htd, KEYS); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
List<HRegionInfo> regions = TEST_UTIL.getHBaseAdmin().getTableRegions(htd.getTableName()); admin.createTable(htd, KEYS);
List<HRegionInfo> regions = admin.getTableRegions(htd.getTableName());
HRegionLocator locator =
(HRegionLocator) admin.getConnection().getRegionLocator(htd.getTableName());
for (int regionReplication = 1; regionReplication < 4; regionReplication++) { for (int regionReplication = 1; regionReplication < 4; regionReplication++) {
List<RegionLocations> regionLocations = new ArrayList<RegionLocations>(); List<RegionLocations> regionLocations = new ArrayList<RegionLocations>();
@ -6263,10 +6265,7 @@ public class TestFromClientSide {
regionLocations.add(new RegionLocations(arr)); regionLocations.add(new RegionLocations(arr));
} }
HTable table = spy(new HTable(TEST_UTIL.getConfiguration(), htd.getTableName())); Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys(regionLocations);
when(table.listRegionLocations()).thenReturn(regionLocations);
Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
assertEquals(KEYS.length + 1, startEndKeys.getFirst().length); assertEquals(KEYS.length + 1, startEndKeys.getFirst().length);
@ -6276,9 +6275,6 @@ public class TestFromClientSide {
assertArrayEquals(startKey, startEndKeys.getFirst()[i]); assertArrayEquals(startKey, startEndKeys.getFirst()[i]);
assertArrayEquals(endKey, startEndKeys.getSecond()[i]); assertArrayEquals(endKey, startEndKeys.getSecond()[i]);
} }
table.close();
} }
} }
} }

View File

@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HadoopShims; import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
@ -336,9 +338,10 @@ public class TestHFileOutputFormat {
public void testJobConfiguration() throws Exception { public void testJobConfiguration() throws Exception {
Job job = new Job(util.getConfiguration()); Job job = new Job(util.getConfiguration());
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTable table = Mockito.mock(HTable.class); HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
setupMockStartKeys(table); RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HFileOutputFormat.configureIncrementalLoad(job, table); setupMockStartKeys(regionLocator);
HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
assertEquals(job.getNumReduceTasks(), 4); assertEquals(job.getNumReduceTasks(), 4);
} }
@ -468,12 +471,13 @@ public class TestHFileOutputFormat {
MutationSerialization.class.getName(), ResultSerialization.class.getName(), MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName()); KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
table.getRegionLocator());
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ; Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks());
assertTrue(job.waitForCompletion(true)); assertTrue(job.waitForCompletion(true));
} }
@ -770,14 +774,14 @@ public class TestHFileOutputFormat {
return familyToDataBlockEncoding; return familyToDataBlockEncoding;
} }
private void setupMockStartKeys(RegionLocator table) throws IOException { private void setupMockStartKeys(RegionLocator regionLocator) throws IOException {
byte[][] mockKeys = new byte[][] { byte[][] mockKeys = new byte[][] {
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("aaa"), Bytes.toBytes("aaa"),
Bytes.toBytes("ggg"), Bytes.toBytes("ggg"),
Bytes.toBytes("zzz") Bytes.toBytes("zzz")
}; };
Mockito.doReturn(mockKeys).when(table).getStartKeys(); Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
} }
/** /**
@ -792,7 +796,8 @@ public class TestHFileOutputFormat {
Path dir = util.getDataTestDir("testColumnFamilySettings"); Path dir = util.getDataTestDir("testColumnFamilySettings");
// Setup table descriptor // Setup table descriptor
HTable table = Mockito.mock(HTable.class); Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
Mockito.doReturn(htd).when(table).getTableDescriptor(); Mockito.doReturn(htd).when(table).getTableDescriptor();
for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
@ -800,7 +805,7 @@ public class TestHFileOutputFormat {
} }
// set up the table to return some mock keys // set up the table to return some mock keys
setupMockStartKeys(table); setupMockStartKeys(regionLocator);
try { try {
// partial map red setup to get an operational writer for testing // partial map red setup to get an operational writer for testing
@ -810,7 +815,7 @@ public class TestHFileOutputFormat {
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
FileOutputFormat.setOutputPath(job, dir); FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job); context = createTestTaskAttemptContext(job);
HFileOutputFormat hof = new HFileOutputFormat(); HFileOutputFormat hof = new HFileOutputFormat();

View File

@ -25,15 +25,6 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -52,7 +43,9 @@ import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
@ -87,6 +80,16 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
/** /**
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}. * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output. * Sets up and runs a mapreduce job that writes hfile output.
@ -132,6 +135,7 @@ public class TestHFileOutputFormat2 {
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
} }
@Override
protected void map( protected void map(
NullWritable n1, NullWritable n2, NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable, Mapper<NullWritable, NullWritable,
@ -216,7 +220,7 @@ public class TestHFileOutputFormat2 {
} }
private TaskAttemptContext createTestTaskAttemptContext(final Job job) private TaskAttemptContext createTestTaskAttemptContext(final Job job)
throws IOException, Exception { throws Exception {
HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
TaskAttemptContext context = hadoop.createTestTaskAttemptContext( TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
job, "attempt_201402131733_0001_m_000000_0"); job, "attempt_201402131733_0001_m_000000_0");
@ -335,9 +339,10 @@ public class TestHFileOutputFormat2 {
public void testJobConfiguration() throws Exception { public void testJobConfiguration() throws Exception {
Job job = new Job(util.getConfiguration()); Job job = new Job(util.getConfiguration());
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTable table = Mockito.mock(HTable.class); Table table = Mockito.mock(Table.class);
setupMockStartKeys(table); RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, table); setupMockStartKeys(regionLocator);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
assertEquals(job.getNumReduceTasks(), 4); assertEquals(job.getNumReduceTasks(), 4);
} }
@ -370,12 +375,10 @@ public class TestHFileOutputFormat2 {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration(); Configuration conf = util.getConfiguration();
byte[][] startKeys = generateRandomStartKeys(5); byte[][] startKeys = generateRandomStartKeys(5);
HBaseAdmin admin = null;
try {
util.startMiniCluster(); util.startMiniCluster();
try (HTable table = util.createTable(TABLE_NAME, FAMILIES);
Admin admin = table.getConnection().getAdmin()) {
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
admin = new HBaseAdmin(conf);
HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table", assertEquals("Should start with empty table",
0, util.countRows(table)); 0, util.countRows(table));
int numRegions = util.createMultiRegions( int numRegions = util.createMultiRegions(
@ -384,7 +387,7 @@ public class TestHFileOutputFormat2 {
// Generate the bulk load files // Generate the bulk load files
util.startMiniMapReduceCluster(); util.startMiniMapReduceCluster();
runIncrementalPELoad(conf, table, testDir); runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
// This doesn't write into the table, just makes files // This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table", assertEquals("HFOF should not touch actual table",
0, util.countRows(table)); 0, util.countRows(table));
@ -404,7 +407,7 @@ public class TestHFileOutputFormat2 {
// handle the split case // handle the split case
if (shouldChangeRegions) { if (shouldChangeRegions) {
LOG.info("Changing regions in table"); LOG.info("Changing regions in table");
admin.disableTable(table.getTableName()); admin.disableTable(table.getName());
while(util.getMiniHBaseCluster().getMaster().getAssignmentManager(). while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
getRegionStates().isRegionsInTransition()) { getRegionStates().isRegionsInTransition()) {
Threads.sleep(200); Threads.sleep(200);
@ -413,9 +416,9 @@ public class TestHFileOutputFormat2 {
byte[][] newStartKeys = generateRandomStartKeys(15); byte[][] newStartKeys = generateRandomStartKeys(15);
util.createMultiRegions( util.createMultiRegions(
util.getConfiguration(), table, FAMILIES[0], newStartKeys); util.getConfiguration(), table, FAMILIES[0], newStartKeys);
admin.enableTable(table.getTableName()); admin.enableTable(table.getName());
while (table.getRegionLocations().size() != 15 || while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
!admin.isTableAvailable(table.getTableName())) { !admin.isTableAvailable(table.getName())) {
Thread.sleep(200); Thread.sleep(200);
LOG.info("Waiting for new region assignment to happen"); LOG.info("Waiting for new region assignment to happen");
} }
@ -452,27 +455,26 @@ public class TestHFileOutputFormat2 {
assertEquals("Data should remain after reopening of regions", assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(table)); tableDigestBefore, util.checksumRows(table));
} finally { } finally {
if (admin != null) admin.close();
util.shutdownMiniMapReduceCluster(); util.shutdownMiniMapReduceCluster();
util.shutdownMiniCluster(); util.shutdownMiniCluster();
} }
} }
private void runIncrementalPELoad( private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
Configuration conf, HTable table, Path outDir) RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
throws Exception { InterruptedException, ClassNotFoundException {
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(), MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName()); KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
assertFalse(util.getTestFileSystem().exists(outDir)) ; assertFalse(util.getTestFileSystem().exists(outDir)) ;
assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
assertTrue(job.waitForCompletion(true)); assertTrue(job.waitForCompletion(true));
} }
@ -494,7 +496,7 @@ public class TestHFileOutputFormat2 {
getMockColumnFamiliesForCompression(numCfs); getMockColumnFamiliesForCompression(numCfs);
Table table = Mockito.mock(HTable.class); Table table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForCompression(table, familyToCompression); setupMockColumnFamiliesForCompression(table, familyToCompression);
HFileOutputFormat2.configureCompression(table, conf); HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
// read back family specific compression setting from the configuration // read back family specific compression setting from the configuration
Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
@ -566,7 +568,7 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(HTable.class); Table table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForBloomType(table, setupMockColumnFamiliesForBloomType(table,
familyToBloomType); familyToBloomType);
HFileOutputFormat2.configureBloomType(table, conf); HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
// read back family specific data block encoding settings from the // read back family specific data block encoding settings from the
// configuration // configuration
@ -637,7 +639,7 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(HTable.class); Table table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForBlockSize(table, setupMockColumnFamiliesForBlockSize(table,
familyToBlockSize); familyToBlockSize);
HFileOutputFormat2.configureBlockSize(table, conf); HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
// read back family specific data block encoding settings from the // read back family specific data block encoding settings from the
// configuration // configuration
@ -696,9 +698,8 @@ public class TestHFileOutputFormat2 {
} }
/** /**
* Test for {@link HFileOutputFormat2#configureDataBlockEncoding(org.apache.hadoop.hbase.client.Table, * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
* Configuration)} and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
* (Configuration)}.
* Tests that the compression map is correctly serialized into * Tests that the compression map is correctly serialized into
* and deserialized from configuration * and deserialized from configuration
* *
@ -713,7 +714,8 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(HTable.class); Table table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForDataBlockEncoding(table, setupMockColumnFamiliesForDataBlockEncoding(table,
familyToDataBlockEncoding); familyToDataBlockEncoding);
HFileOutputFormat2.configureDataBlockEncoding(table, conf); HTableDescriptor tableDescriptor = table.getTableDescriptor();
HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
// read back family specific data block encoding settings from the // read back family specific data block encoding settings from the
// configuration // configuration
@ -792,7 +794,8 @@ public class TestHFileOutputFormat2 {
Path dir = util.getDataTestDir("testColumnFamilySettings"); Path dir = util.getDataTestDir("testColumnFamilySettings");
// Setup table descriptor // Setup table descriptor
HTable table = Mockito.mock(HTable.class); Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
Mockito.doReturn(htd).when(table).getTableDescriptor(); Mockito.doReturn(htd).when(table).getTableDescriptor();
for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
@ -800,7 +803,7 @@ public class TestHFileOutputFormat2 {
} }
// set up the table to return some mock keys // set up the table to return some mock keys
setupMockStartKeys(table); setupMockStartKeys(regionLocator);
try { try {
// partial map red setup to get an operational writer for testing // partial map red setup to get an operational writer for testing
@ -810,7 +813,7 @@ public class TestHFileOutputFormat2 {
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat2.configureIncrementalLoad(job, table, table); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
FileOutputFormat.setOutputPath(job, dir); FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job); context = createTestTaskAttemptContext(job);
HFileOutputFormat2 hof = new HFileOutputFormat2(); HFileOutputFormat2 hof = new HFileOutputFormat2();
@ -891,10 +894,10 @@ public class TestHFileOutputFormat2 {
conf.setInt("hbase.hstore.compaction.min", 2); conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5); generateRandomStartKeys(5);
try {
util.startMiniCluster(); util.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection();
Admin admin = conn.getAdmin()) {
final FileSystem fs = util.getDFSCluster().getFileSystem(); final FileSystem fs = util.getDFSCluster().getFileSystem();
HBaseAdmin admin = new HBaseAdmin(conf);
HTable table = util.createTable(TABLE_NAME, FAMILIES); HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table", 0, util.countRows(table)); assertEquals("Should start with empty table", 0, util.countRows(table));
@ -912,7 +915,8 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf, table, testDir); runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
testDir);
// Perform the actual load // Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
} }
@ -926,9 +930,10 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length); assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file // minor compactions shouldn't get rid of the file
admin.compact(TABLE_NAME.getName()); admin.compact(TABLE_NAME);
try { try {
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
@ -939,8 +944,9 @@ public class TestHFileOutputFormat2 {
} }
// a major compaction should work though // a major compaction should work though
admin.majorCompact(TABLE_NAME.getName()); admin.majorCompact(TABLE_NAME);
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
@ -958,12 +964,12 @@ public class TestHFileOutputFormat2 {
conf.setInt("hbase.hstore.compaction.min", 2); conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5); generateRandomStartKeys(5);
try {
util.startMiniCluster(); util.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()){
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
final FileSystem fs = util.getDFSCluster().getFileSystem(); final FileSystem fs = util.getDFSCluster().getFileSystem();
HBaseAdmin admin = new HBaseAdmin(conf); Table table = util.createTable(TABLE_NAME, FAMILIES);
HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table", 0, util.countRows(table)); assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir // deep inspection: get the StoreFile dir
@ -977,9 +983,10 @@ public class TestHFileOutputFormat2 {
Put p = new Put(Bytes.toBytes("test")); Put p = new Put(Bytes.toBytes("test"));
p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
table.put(p); table.put(p);
admin.flush(TABLE_NAME.getName()); admin.flush(TABLE_NAME);
assertEquals(1, util.countRows(table)); assertEquals(1, util.countRows(table));
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
@ -989,10 +996,12 @@ public class TestHFileOutputFormat2 {
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
true); true);
util.startMiniMapReduceCluster(); util.startMiniMapReduceCluster();
runIncrementalPELoad(conf, table, testDir);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
// Perform the actual load // Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
// Ensure data shows up // Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
@ -1003,9 +1012,10 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length); assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file // minor compactions shouldn't get rid of the file
admin.compact(TABLE_NAME.getName()); admin.compact(TABLE_NAME);
try { try {
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
@ -1016,8 +1026,9 @@ public class TestHFileOutputFormat2 {
} }
// a major compaction should work though // a major compaction should work though
admin.majorCompact(TABLE_NAME.getName()); admin.majorCompact(TABLE_NAME);
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
@ -1049,18 +1060,22 @@ public class TestHFileOutputFormat2 {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
util = new HBaseTestingUtility(conf); util = new HBaseTestingUtility(conf);
if ("newtable".equals(args[0])) { if ("newtable".equals(args[0])) {
byte[] tname = args[1].getBytes(); TableName tname = TableName.valueOf(args[1]);
HTable table = util.createTable(tname, FAMILIES); try (HTable table = util.createTable(tname, FAMILIES);
HBaseAdmin admin = new HBaseAdmin(conf); Admin admin = table.getConnection().getAdmin()) {
admin.disableTable(tname); admin.disableTable(tname);
byte[][] startKeys = generateRandomStartKeys(5); byte[][] startKeys = generateRandomStartKeys(5);
util.createMultiRegions(conf, table, FAMILIES[0], startKeys); util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
admin.enableTable(tname); admin.enableTable(tname);
}
} else if ("incremental".equals(args[0])) { } else if ("incremental".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]); TableName tname = TableName.valueOf(args[1]);
HTable table = new HTable(conf, tname); try(Connection c = ConnectionFactory.createConnection(conf);
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out"); Path outDir = new Path("incremental-out");
runIncrementalPELoad(conf, table, outDir); runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
}
} else { } else {
throw new RuntimeException( throw new RuntimeException(
"usage: TestHFileOutputFormat2 newtable | incremental"); "usage: TestHFileOutputFormat2 newtable | incremental");

View File

@ -275,7 +275,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) { try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
setupTable(connection, table, 10); setupTable(connection, table, 10);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, @Override
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
throws IOException { throws IOException {
int i = attmptedCalls.incrementAndGet(); int i = attmptedCalls.incrementAndGet();
@ -349,7 +350,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
// files to fail when attempt to atomically import. This is recoverable. // files to fail when attempt to atomically import. This is recoverable.
final AtomicInteger attemptedCalls = new AtomicInteger(); final AtomicInteger attemptedCalls = new AtomicInteger();
LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
protected void bulkLoadPhase(final Table htable, final HConnection conn, @Override
protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue, ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
int i = attemptedCalls.incrementAndGet(); int i = attemptedCalls.incrementAndGet();
@ -391,9 +393,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
final AtomicInteger countedLqis= new AtomicInteger(); final AtomicInteger countedLqis= new AtomicInteger();
LoadIncrementalHFiles lih = new LoadIncrementalHFiles( LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) { util.getConfiguration()) {
@Override
protected List<LoadQueueItem> groupOrSplit( protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable htable, final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) { if (lqis != null) {
@ -427,9 +430,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
util.getConfiguration()) { util.getConfiguration()) {
int i = 0; int i = 0;
@Override
protected List<LoadQueueItem> groupOrSplit( protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable table, final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
i++; i++;

View File

@ -76,11 +76,12 @@ public class TestMultiTableInputFormat {
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
// create and fill table // create and fill table
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
HTable table = try (HTable table =
TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), INPUT_FAMILY); TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), INPUT_FAMILY)) {
TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, INPUT_FAMILY, 4); TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, INPUT_FAMILY, 4);
TEST_UTIL.loadTable(table, INPUT_FAMILY, false); TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
} }
}
// start MR cluster // start MR cluster
TEST_UTIL.startMiniMapReduceCluster(); TEST_UTIL.startMiniMapReduceCluster();
} }
@ -139,6 +140,7 @@ public class TestMultiTableInputFormat {
private String first = null; private String first = null;
private String last = null; private String last = null;
@Override
protected void reduce(ImmutableBytesWritable key, protected void reduce(ImmutableBytesWritable key,
Iterable<ImmutableBytesWritable> values, Context context) Iterable<ImmutableBytesWritable> values, Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -154,6 +156,7 @@ public class TestMultiTableInputFormat {
assertEquals(3, count); assertEquals(3, count);
} }
@Override
protected void cleanup(Context context) throws IOException, protected void cleanup(Context context) throws IOException,
InterruptedException { InterruptedException {
Configuration c = context.getConfiguration(); Configuration c = context.getConfiguration();

View File

@ -308,7 +308,8 @@ public class TestServerCustomProtocol {
@Test @Test
public void testSingleMethod() throws Throwable { public void testSingleMethod() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE); try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class, Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
null, ROW_A, null, ROW_A,
new Batch.Call<PingProtos.PingService, String>() { new Batch.Call<PingProtos.PingService, String>() {
@ -323,20 +324,21 @@ public class TestServerCustomProtocol {
// Should have gotten results for 1 of the three regions only since we specified // Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region // rows from 1 region
assertEquals(1, results.size()); assertEquals(1, results.size());
verifyRegionResults(table, results, ROW_A); verifyRegionResults(locator, results, ROW_A);
final String name = "NAME"; final String name = "NAME";
results = hello(table, name, null, ROW_A); results = hello(table, name, null, ROW_A);
// Should have gotten results for 1 of the three regions only since we specified // Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region // rows from 1 region
assertEquals(1, results.size()); assertEquals(1, results.size());
verifyRegionResults(table, results, "Hello, NAME", ROW_A); verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
table.close(); }
} }
@Test @Test
public void testRowRange() throws Throwable { public void testRowRange() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE); try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
for (Entry<HRegionInfo, ServerName> e: table.getRegionLocations().entrySet()) { for (Entry<HRegionInfo, ServerName> e: table.getRegionLocations().entrySet()) {
LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue()); LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue());
} }
@ -349,7 +351,7 @@ public class TestServerCustomProtocol {
Map<byte [], String> results = ping(table, null, ROW_A); Map<byte [], String> results = ping(table, null, ROW_A);
// Should contain first region only. // Should contain first region only.
assertEquals(1, results.size()); assertEquals(1, results.size());
verifyRegionResults(table, results, ROW_A); verifyRegionResults(locator, results, ROW_A);
// Test start row + empty end // Test start row + empty end
results = ping(table, ROW_BC, null); results = ping(table, ROW_BC, null);
@ -358,15 +360,15 @@ public class TestServerCustomProtocol {
HRegionLocation loc = table.getRegionLocation(ROW_A, true); HRegionLocation loc = table.getRegionLocation(ROW_A, true);
assertNull("Should be missing region for row aaa (prior to start row)", assertNull("Should be missing region for row aaa (prior to start row)",
results.get(loc.getRegionInfo().getRegionName())); results.get(loc.getRegionInfo().getRegionName()));
verifyRegionResults(table, results, ROW_B); verifyRegionResults(locator, results, ROW_B);
verifyRegionResults(table, results, ROW_C); verifyRegionResults(locator, results, ROW_C);
// test empty start + end // test empty start + end
results = ping(table, null, ROW_BC); results = ping(table, null, ROW_BC);
// should contain the first 2 regions // should contain the first 2 regions
assertEquals(2, results.size()); assertEquals(2, results.size());
verifyRegionResults(table, results, ROW_A); verifyRegionResults(locator, results, ROW_A);
verifyRegionResults(table, results, ROW_B); verifyRegionResults(locator, results, ROW_B);
loc = table.getRegionLocation(ROW_C, true); loc = table.getRegionLocation(ROW_C, true);
assertNull("Should be missing region for row ccc (past stop row)", assertNull("Should be missing region for row ccc (past stop row)",
results.get(loc.getRegionInfo().getRegionName())); results.get(loc.getRegionInfo().getRegionName()));
@ -375,8 +377,8 @@ public class TestServerCustomProtocol {
results = ping(table, ROW_AB, ROW_BC); results = ping(table, ROW_AB, ROW_BC);
// should contain first 2 regions // should contain first 2 regions
assertEquals(2, results.size()); assertEquals(2, results.size());
verifyRegionResults(table, results, ROW_A); verifyRegionResults(locator, results, ROW_A);
verifyRegionResults(table, results, ROW_B); verifyRegionResults(locator, results, ROW_B);
loc = table.getRegionLocation(ROW_C, true); loc = table.getRegionLocation(ROW_C, true);
assertNull("Should be missing region for row ccc (past stop row)", assertNull("Should be missing region for row ccc (past stop row)",
results.get(loc.getRegionInfo().getRegionName())); results.get(loc.getRegionInfo().getRegionName()));
@ -385,14 +387,14 @@ public class TestServerCustomProtocol {
results = ping(table, ROW_B, ROW_BC); results = ping(table, ROW_B, ROW_BC);
// should only contain region bbb // should only contain region bbb
assertEquals(1, results.size()); assertEquals(1, results.size());
verifyRegionResults(table, results, ROW_B); verifyRegionResults(locator, results, ROW_B);
loc = table.getRegionLocation(ROW_A, true); loc = table.getRegionLocation(ROW_A, true);
assertNull("Should be missing region for row aaa (prior to start)", assertNull("Should be missing region for row aaa (prior to start)",
results.get(loc.getRegionInfo().getRegionName())); results.get(loc.getRegionInfo().getRegionName()));
loc = table.getRegionLocation(ROW_C, true); loc = table.getRegionLocation(ROW_C, true);
assertNull("Should be missing region for row ccc (past stop row)", assertNull("Should be missing region for row ccc (past stop row)",
results.get(loc.getRegionInfo().getRegionName())); results.get(loc.getRegionInfo().getRegionName()));
table.close(); }
} }
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end) private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
@ -415,35 +417,40 @@ public class TestServerCustomProtocol {
@Test @Test
public void testCompoundCall() throws Throwable { public void testCompoundCall() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE); try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C); Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
verifyRegionResults(table, results, "Hello, pong", ROW_A); verifyRegionResults(locator, results, "Hello, pong", ROW_A);
verifyRegionResults(table, results, "Hello, pong", ROW_B); verifyRegionResults(locator, results, "Hello, pong", ROW_B);
verifyRegionResults(table, results, "Hello, pong", ROW_C); verifyRegionResults(locator, results, "Hello, pong", ROW_C);
table.close(); }
} }
@Test @Test
public void testNullCall() throws Throwable { public void testNullCall() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE); try(HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
Map<byte[],String> results = hello(table, null, ROW_A, ROW_C); Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
verifyRegionResults(table, results, "Who are you?", ROW_A); verifyRegionResults(locator, results, "Who are you?", ROW_A);
verifyRegionResults(table, results, "Who are you?", ROW_B); verifyRegionResults(locator, results, "Who are you?", ROW_B);
verifyRegionResults(table, results, "Who are you?", ROW_C); verifyRegionResults(locator, results, "Who are you?", ROW_C);
}
} }
@Test @Test
public void testNullReturn() throws Throwable { public void testNullReturn() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE); try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C); Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
verifyRegionResults(table, results, null, ROW_A); verifyRegionResults(locator, results, null, ROW_A);
verifyRegionResults(table, results, null, ROW_B); verifyRegionResults(locator, results, null, ROW_B);
verifyRegionResults(table, results, null, ROW_C); verifyRegionResults(locator, results, null, ROW_C);
}
} }
@Test @Test
public void testEmptyReturnType() throws Throwable { public void testEmptyReturnType() throws Throwable {
Table table = new HTable(util.getConfiguration(), TEST_TABLE); try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
Map<byte[],String> results = noop(table, ROW_A, ROW_C); Map<byte[],String> results = noop(table, ROW_A, ROW_C);
assertEquals("Should have results from three regions", 3, results.size()); assertEquals("Should have results from three regions", 3, results.size());
// all results should be null // all results should be null
@ -451,13 +458,14 @@ public class TestServerCustomProtocol {
assertNull(v); assertNull(v);
} }
} }
}
private void verifyRegionResults(RegionLocator table, private void verifyRegionResults(RegionLocator table,
Map<byte[],String> results, byte[] row) throws Exception { Map<byte[],String> results, byte[] row) throws Exception {
verifyRegionResults(table, results, "pong", row); verifyRegionResults(table, results, "pong", row);
} }
private void verifyRegionResults(RegionLocator table, private void verifyRegionResults(RegionLocator regionLocator,
Map<byte[], String> results, String expected, byte[] row) Map<byte[], String> results, String expected, byte[] row)
throws Exception { throws Exception {
for (Map.Entry<byte [], String> e: results.entrySet()) { for (Map.Entry<byte [], String> e: results.entrySet()) {
@ -465,7 +473,7 @@ public class TestServerCustomProtocol {
", result key=" + Bytes.toString(e.getKey()) + ", result key=" + Bytes.toString(e.getKey()) +
", value=" + e.getValue()); ", value=" + e.getValue());
} }
HRegionLocation loc = table.getRegionLocation(row, true); HRegionLocation loc = regionLocator.getRegionLocation(row, true);
byte[] region = loc.getRegionInfo().getRegionName(); byte[] region = loc.getRegionInfo().getRegionName();
assertTrue("Results should contain region " + assertTrue("Results should contain region " +
Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'", Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",