diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 88e045b666a..7bfa9725f35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -706,11 +706,7 @@ class ConnectionManager { @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { - if (managed) { - throw new IOException("The connection has to be unmanaged."); - } - return new HTable( - tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, getBatchPool()); + return new HRegionLocator(tableName, this); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java new file mode 100644 index 00000000000..fa856538f68 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java @@ -0,0 +1,148 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.annotations.VisibleForTesting; + +/** + * An implementation of {@link RegionLocator}. Used to view region location information for a single + * HBase table. Lightweight. Get as needed and just close when done. Instances of this class SHOULD + * NOT be constructed directly. Obtain an instance via {@link Connection}. See + * {@link ConnectionFactory} class comment for an example of how. + * + *

This class is thread safe + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class HRegionLocator implements RegionLocator { + + private final TableName tableName; + private final ClusterConnection connection; + + public HRegionLocator(TableName tableName, ClusterConnection connection) { + this.connection = connection; + this.tableName = tableName; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + // This method is required by the RegionLocator interface. This implementation does not have any + // persistent state, so there is no need to do anything here. + } + + /** + * {@inheritDoc} + */ + @Override + public HRegionLocation getRegionLocation(final byte [] row) + throws IOException { + return connection.getRegionLocation(tableName, row, false); + } + + /** + * {@inheritDoc} + */ + @Override + public HRegionLocation getRegionLocation(final byte [] row, boolean reload) + throws IOException { + return connection.getRegionLocation(tableName, row, reload); + } + + @Override + public List getAllRegionLocations() throws IOException { + NavigableMap locations = + MetaScanner.allTableRegions(this.connection, getName()); + ArrayList regions = new ArrayList<>(locations.size()); + for (Entry entry : locations.entrySet()) { + regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); + } + return regions; + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][] getStartKeys() throws IOException { + return getStartEndKeys().getFirst(); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][] getEndKeys() throws IOException { + return getStartEndKeys().getSecond(); + } + + /** + * {@inheritDoc} + */ + @Override + public Pair getStartEndKeys() throws IOException { + return getStartEndKeys(listRegionLocations()); + } + + @VisibleForTesting + Pair getStartEndKeys(List regions) { + final byte[][] startKeyList = new byte[regions.size()][]; + final byte[][] endKeyList = new byte[regions.size()][]; + + for (int i = 0; i < regions.size(); i++) { + HRegionInfo region = regions.get(i).getRegionLocation().getRegionInfo(); + startKeyList[i] = region.getStartKey(); + endKeyList[i] = region.getEndKey(); + } + + return new Pair<>(startKeyList, endKeyList); + } + + @Override + public TableName getName() { + return this.tableName; + } + + @VisibleForTesting + List listRegionLocations() throws IOException { + return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); + } + + public Configuration getConfiguration() { + return connection.getConfiguration(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index c141b29ebf3..43ace95ac71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -48,7 +47,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; @@ -110,7 +108,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private @InterfaceStability.Stable -public class HTable implements HTableInterface, RegionLocator { +public class HTable implements HTableInterface { private static final Log LOG = LogFactory.getLog(HTable.class); protected ClusterConnection connection; private final TableName tableName; @@ -127,6 +125,7 @@ public class HTable implements HTableInterface, RegionLocator { private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; + private HRegionLocator locator; /** The Async process for puts with autoflush set to false or multiputs */ protected AsyncProcess ap; @@ -369,6 +368,8 @@ public class HTable implements HTableInterface, RegionLocator { multiAp = this.connection.getAsyncProcess(); this.closed = false; + + this.locator = new HRegionLocator(tableName, connection); } /** @@ -478,25 +479,25 @@ public class HTable implements HTableInterface, RegionLocator { @Deprecated public HRegionLocation getRegionLocation(final String row) throws IOException { - return connection.getRegionLocation(tableName, Bytes.toBytes(row), false); + return getRegionLocation(Bytes.toBytes(row), false); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead. */ - @Override + @Deprecated public HRegionLocation getRegionLocation(final byte [] row) throws IOException { - return connection.getRegionLocation(tableName, row, false); + return locator.getRegionLocation(row); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead. */ - @Override + @Deprecated public HRegionLocation getRegionLocation(final byte [] row, boolean reload) throws IOException { - return connection.getRegionLocation(tableName, row, reload); + return locator.getRegionLocation(row, reload); } /** @@ -602,45 +603,27 @@ public class HTable implements HTableInterface, RegionLocator { } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ - @Override + @Deprecated public byte [][] getStartKeys() throws IOException { - return getStartEndKeys().getFirst(); + return locator.getStartKeys(); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getEndKeys()} instead; */ - @Override + @Deprecated public byte[][] getEndKeys() throws IOException { - return getStartEndKeys().getSecond(); + return locator.getEndKeys(); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ - @Override + @Deprecated public Pair getStartEndKeys() throws IOException { - - List regions = listRegionLocations(); - final List startKeyList = new ArrayList(regions.size()); - final List endKeyList = new ArrayList(regions.size()); - - for (RegionLocations locations : regions) { - HRegionInfo region = locations.getRegionLocation().getRegionInfo(); - startKeyList.add(region.getStartKey()); - endKeyList.add(region.getEndKey()); - } - - return new Pair( - startKeyList.toArray(new byte[startKeyList.size()][]), - endKeyList.toArray(new byte[endKeyList.size()][])); - } - - @VisibleForTesting - List listRegionLocations() throws IOException { - return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); + return locator.getStartEndKeys(); } /** @@ -663,15 +646,12 @@ public class HTable implements HTableInterface, RegionLocator { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs + * + * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ - @Override + @Deprecated public List getAllRegionLocations() throws IOException { - NavigableMap locations = getRegionLocations(); - ArrayList regions = new ArrayList<>(locations.size()); - for (Entry entry : locations.entrySet()) { - regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); - } - return regions; + return locator.getAllRegionLocations(); } /** @@ -1928,4 +1908,8 @@ public class HTable implements HTableInterface, RegionLocator { callbackErrorServers); } } + + public RegionLocator getRegionLocator() { + return this.locator; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java index ab77cebe370..83c9883bdc9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java @@ -62,7 +62,7 @@ public class HTableUtil { */ public static void bucketRsPut(HTable htable, List puts) throws IOException { - Map> putMap = createRsPutMap(htable, puts); + Map> putMap = createRsPutMap(htable.getRegionLocator(), puts); for (List rsPuts: putMap.values()) { htable.put( rsPuts ); } @@ -92,7 +92,7 @@ public class HTableUtil { public static void bucketRsBatch(HTable htable, List rows) throws IOException { try { - Map> rowMap = createRsRowMap(htable, rows); + Map> rowMap = createRsRowMap(htable.getRegionLocator(), rows); for (List rsRows: rowMap.values()) { htable.batch( rsRows ); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 40ca4a4d7eb..7c07a9952d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; public abstract class RegionServerCallable implements RetryingCallable { // Public because used outside of this package over in ipc. static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final HConnection connection; + protected final Connection connection; protected final TableName tableName; protected final byte[] row; protected HRegionLocation location; @@ -61,7 +61,7 @@ public abstract class RegionServerCallable implements RetryingCallable { * @param tableName Table name to which row belongs. * @param row The row we want in tableName. */ - public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { this.connection = connection; this.tableName = tableName; this.row = row; @@ -75,7 +75,9 @@ public abstract class RegionServerCallable implements RetryingCallable { */ @Override public void prepare(final boolean reload) throws IOException { - this.location = connection.getRegionLocation(tableName, row, reload); + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + this.location = regionLocator.getRegionLocation(row, reload); + } if (this.location == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); @@ -87,7 +89,7 @@ public abstract class RegionServerCallable implements RetryingCallable { * @return {@link HConnection} instance used by this Callable. */ HConnection getConnection() { - return this.connection; + return (HConnection) this.connection; } protected ClientService.BlockingInterface getStub() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 49c7efdadb2..b2020bde4b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -98,8 +98,8 @@ public class RpcRetryingCaller { } public void cancel(){ - cancelled.set(true); synchronized (cancelled){ + cancelled.set(true); cancelled.notifyAll(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 8e532e55d93..6bc5accf227 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -127,7 +127,6 @@ public class MetaTableLocator { * @param zkw zookeeper connection to use * @return server name or null if we failed to get the data. */ - @Nullable public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) { try { RegionState state = getMetaRegionState(zkw); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 0ad65c3b55f..8773d0a489d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -18,17 +18,10 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import static org.junit.Assert.assertEquals; import com.google.common.base.Joiner; + import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; @@ -42,20 +35,23 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -79,7 +75,15 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; /** * Test Bulk Load and MR on a distributed cluster. @@ -247,7 +251,6 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { EnvironmentEdgeManager.currentTime(); Configuration conf = new Configuration(util.getConfiguration()); Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); - HTable table = new HTable(conf, getTablename()); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); @@ -273,18 +276,23 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { // Set where to place the hfiles. FileOutputFormat.setOutputPath(job, p); + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin(); + Table table = conn.getTable(getTablename()); + RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { + + // Configure the partitioner and other things needed for HFileOutputFormat. + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + // Run the job making sure it works. + assertEquals(true, job.waitForCompletion(true)); - // Run the job making sure it works. - assertEquals(true, job.waitForCompletion(true)); + // Create a new loader. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - // Create a new loader. - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - - // Load the HFiles in. - loader.doBulkLoad(p, table); + // Load the HFiles in. + loader.doBulkLoad(p, admin, table, regionLocator); + } // Delete the files. util.getTestFileSystem().delete(p, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 402381b36c0..6d6feb1f8d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; @@ -87,7 +88,8 @@ public class HFileOutputFormat extends FileOutputFormat> cls) throws IOException { - Configuration conf = job.getConfiguration(); + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *

+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, + RegionLocator regionLocator) throws IOException { + configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); + } + static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, + RegionLocator regionLocator, Class> cls) throws IOException, + UnsupportedEncodingException { + Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); @@ -412,7 +431,7 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + table.getName()); + LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); @@ -420,14 +439,14 @@ public class HFileOutputFormat2 configurePartitioner(job, startKeys); // Set compression algorithms based on column families - configureCompression(table, conf); - configureBloomType(table, conf); - configureBlockSize(table, conf); - configureDataBlockEncoding(table, conf); + configureCompression(conf, tableDescriptor); + configureBloomType(tableDescriptor, conf); + configureBlockSize(tableDescriptor, conf); + configureDataBlockEncoding(tableDescriptor, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + table.getName() + " output configured."); + LOG.info("Incremental table " + regionLocator.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { @@ -438,10 +457,11 @@ public class HFileOutputFormat2 job.setOutputFormatClass(HFileOutputFormat2.class); // Set compression algorithms based on column families - configureCompression(table, conf); - configureBloomType(table, conf); - configureBlockSize(table, conf); - configureDataBlockEncoding(table, conf); + configureCompression(conf, table.getTableDescriptor()); + configureBloomType(table.getTableDescriptor(), conf); + configureBlockSize(table.getTableDescriptor(), conf); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + configureDataBlockEncoding(tableDescriptor, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); @@ -590,10 +610,9 @@ public class HFileOutputFormat2 @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting - static void configureCompression( - Table table, Configuration conf) throws IOException { + static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) + throws UnsupportedEncodingException { StringBuilder compressionConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); if(tableDescriptor == null){ // could happen with mock table instance return; @@ -617,17 +636,16 @@ public class HFileOutputFormat2 /** * Serialize column family to block size map to configuration. * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from + * @param tableDescriptor to read the properties from * @param conf to persist serialized values into + * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBlockSize( - Table table, Configuration conf) throws IOException { + static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) + throws UnsupportedEncodingException { StringBuilder blockSizeConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; @@ -651,16 +669,15 @@ public class HFileOutputFormat2 /** * Serialize column family to bloom type map to configuration. * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from + * @param tableDescriptor to read the properties from * @param conf to persist serialized values into + * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBloomType( - Table table, Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); + static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) + throws UnsupportedEncodingException { if (tableDescriptor == null) { // could happen with mock table instance return; @@ -694,9 +711,8 @@ public class HFileOutputFormat2 * on failure to read column family descriptors */ @VisibleForTesting - static void configureDataBlockEncoding(Table table, - Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); + static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, + Configuration conf) throws UnsupportedEncodingException { if (tableDescriptor == null) { // could happen with mock table instance return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 399d6079914..d1e886de371 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -41,13 +41,18 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -441,15 +446,18 @@ public class Import { if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); - HTable table = new HTable(conf, tableName); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table, table); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Preconditions.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)){ + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } } else { // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 54e0034eff1..90f2f0e51df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -20,17 +20,13 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -40,11 +36,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -59,9 +58,11 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; /** * Tool to import data from a TSV file. @@ -496,7 +497,8 @@ public class ImportTsv extends Configured implements Tool { throw new TableNotFoundException(errorMsg); } } - try (HTable table = (HTable)connection.getTable(tableName)) { + try (Table table = connection.getTable(tableName); + RegionLocator regionLocator = connection.getRegionLocator(tableName)) { boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); // if no.strict is false then check column family if(!noStrict) { @@ -534,7 +536,8 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + regionLocator); } } else { if (!admin.tableExists(tableName)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b4b6adce1cc..c5af9373934 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -20,42 +20,21 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -64,10 +43,14 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; @@ -95,12 +78,30 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Tool to load the output of HFileOutputFormat into an existing table. @@ -235,12 +236,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - final HConnection conn = table.getConnection(); + doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param table the table to load into + * @throws TableNotFoundException if table does not yet exist + */ + @SuppressWarnings("deprecation") + public void doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator) throws TableNotFoundException, IOException { - if (!conn.isTableAvailable(table.getName())) { - throw new TableNotFoundException("Table " + - Bytes.toStringBinary(table.getTableName()) + - "is not currently available."); + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + "is not currently available."); } // initialize thread pools @@ -276,7 +289,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + Bytes.toString(table.getTableName()) + " are: " + familyNames; + + table.getName() + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -300,7 +313,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair startEndKeys = table.getStartEndKeys(); + final Pair startEndKeys = regionLocator.getStartEndKeys(); if (count != 0) { LOG.info("Split occured while grouping HFiles, retry attempt " + + count + " with " + queue.size() + " files remaining to group or split"); @@ -323,7 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, conn, pool, queue, regionGroups); + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -359,7 +372,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * them. Any failures are re-queued for another pass with the * groupOrSplitPhase. */ - protected void bulkLoadPhase(final Table table, final HConnection conn, + protected void bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque queue, final Multimap regionGroups) throws IOException { // atomically bulk load the groups. @@ -431,7 +444,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return A Multimap that groups LQI by likely * bulk load region targets. */ - private Multimap groupOrSplitPhase(final HTable table, + private Multimap groupOrSplitPhase(final Table table, ExecutorService pool, Deque queue, final Pair startEndKeys) throws IOException { // need synchronized only within this scope of this @@ -524,7 +537,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @throws IOException */ protected List groupOrSplit(Multimap regionGroups, - final LoadQueueItem item, final HTable table, + final LoadQueueItem item, final Table table, final Pair startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; @@ -569,18 +582,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ if (indexForCallable < 0) { throw new IOException("The first region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { throw new IOException("The last region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if (indexForCallable + 1 < startEndKeys.getFirst().length && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { throw new IOException("The endkey of one region for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " is not equal to the startkey of the next region in hbase:meta." + "Please use hbck tool to fix it first."); } @@ -623,7 +636,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List tryAtomicRegionLoad(final HConnection conn, + protected List tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, Collection lqis) throws IOException { final List> famPaths = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index b9a2db7a209..890cfdd78a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -155,14 +155,10 @@ public abstract class MultiTableInputFormatBase extends Map.Entry> entry = (Map.Entry>) iter.next(); TableName tableName = entry.getKey(); List scanList = entry.getValue(); - Table table = null; - RegionLocator regionLocator = null; - Connection conn = null; - try{ - conn = ConnectionFactory.createConnection(context.getConfiguration()); - table = conn.getTable(tableName); - regionLocator = (RegionLocator) table; + try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( regionLocator, conn.getAdmin()); Pair keys = regionLocator.getStartEndKeys(); @@ -210,10 +206,6 @@ public abstract class MultiTableInputFormatBase extends } } } - } finally { - if (null != table) table.close(); - if (null != regionLocator) regionLocator.close(); - if (null != conn) conn.close(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 41234673c6a..3bf001bb703 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -355,7 +355,7 @@ extends InputFormat { @Deprecated protected void setHTable(HTable table) throws IOException { this.table = table; - this.regionLocator = table; + this.regionLocator = table.getRegionLocator(); this.admin = table.getConnection().getAdmin(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index a487878a999..fb888aa9125 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -17,16 +17,8 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.TreeMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -36,14 +28,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -52,6 +49,12 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.TreeMap; + /** * A tool to replay WAL files as a M/R job. * The WAL can be replayed for a set of tables or all tables, @@ -257,13 +260,17 @@ public class WALPlayer extends Configured implements Tool { if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); } - HTable table = new HTable(conf, TableName.valueOf(tables[0])); + TableName tableName = TableName.valueOf(tables[0]); job.setMapperClass(WALKeyValueMapper.class); job.setReducerClass(KeyValueSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + } TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java index 92c44101f9e..4f7c0a5d12d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java @@ -68,7 +68,7 @@ public class RegionSizeCalculator { public RegionSizeCalculator(HTable table) throws IOException { HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); try { - init(table, admin); + init(table.getRegionLocator(), admin); } finally { admin.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index ee67629c72a..74bcfa2ac24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2128,56 +2128,55 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { final byte[] columnFamily, byte [][] startKeys) throws IOException { Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); - Table meta = new HTable(c, TableName.META_TABLE_NAME); - HTableDescriptor htd = table.getTableDescriptor(); - if(!htd.hasFamily(columnFamily)) { - HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); - htd.addFamily(hcd); - } - // remove empty region - this is tricky as the mini cluster during the test - // setup already has the ",,123456789" row with an empty start - // and end key. Adding the custom regions below adds those blindly, - // including the new start region from empty to "bbb". lg - List rows = getMetaTableRows(htd.getTableName()); - String regionToDeleteInFS = table - .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) - .getRegionInfo().getEncodedName(); - List newRegions = new ArrayList(startKeys.length); - // add custom ones - int count = 0; - for (int i = 0; i < startKeys.length; i++) { - int j = (i + 1) % startKeys.length; - HRegionInfo hri = new HRegionInfo(table.getName(), - startKeys[i], startKeys[j]); - MetaTableAccessor.addRegionToMeta(meta, hri); - newRegions.add(hri); - count++; - } - // see comment above, remove "old" (or previous) single region - for (byte[] row : rows) { - LOG.info("createMultiRegions: deleting meta row -> " + - Bytes.toStringBinary(row)); - meta.delete(new Delete(row)); - } - // remove the "old" region from FS - Path tableDir = new Path(getDefaultRootDirPath().toString() - + System.getProperty("file.separator") + htd.getTableName() - + System.getProperty("file.separator") + regionToDeleteInFS); - FileSystem.get(c).delete(tableDir, true); - // flush cache of regions - HConnection conn = table.getConnection(); - conn.clearRegionCache(); - // assign all the new regions IF table is enabled. - Admin admin = getHBaseAdmin(); - if (admin.isTableEnabled(table.getName())) { - for(HRegionInfo hri : newRegions) { - admin.assign(hri.getRegionName()); + try (Table meta = new HTable(c, TableName.META_TABLE_NAME)) { + HTableDescriptor htd = table.getTableDescriptor(); + if(!htd.hasFamily(columnFamily)) { + HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); + htd.addFamily(hcd); } + // remove empty region - this is tricky as the mini cluster during the test + // setup already has the ",,123456789" row with an empty start + // and end key. Adding the custom regions below adds those blindly, + // including the new start region from empty to "bbb". lg + List rows = getMetaTableRows(htd.getTableName()); + String regionToDeleteInFS = table + .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) + .getRegionInfo().getEncodedName(); + List newRegions = new ArrayList(startKeys.length); + // add custom ones + int count = 0; + for (int i = 0; i < startKeys.length; i++) { + int j = (i + 1) % startKeys.length; + HRegionInfo hri = new HRegionInfo(table.getName(), + startKeys[i], startKeys[j]); + MetaTableAccessor.addRegionToMeta(meta, hri); + newRegions.add(hri); + count++; + } + // see comment above, remove "old" (or previous) single region + for (byte[] row : rows) { + LOG.info("createMultiRegions: deleting meta row -> " + + Bytes.toStringBinary(row)); + meta.delete(new Delete(row)); + } + // remove the "old" region from FS + Path tableDir = new Path(getDefaultRootDirPath().toString() + + System.getProperty("file.separator") + htd.getTableName() + + System.getProperty("file.separator") + regionToDeleteInFS); + FileSystem.get(c).delete(tableDir, true); + // flush cache of regions + HConnection conn = table.getConnection(); + conn.clearRegionCache(); + // assign all the new regions IF table is enabled. + Admin admin = conn.getAdmin(); + if (admin.isTableEnabled(table.getName())) { + for(HRegionInfo hri : newRegions) { + admin.assign(hri.getRegionName()); + } + } + + return count; } - - meta.close(); - - return count; } /** @@ -3453,10 +3452,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } public static int getMetaRSPort(Configuration conf) throws IOException { - RegionLocator table = new HTable(conf, TableName.META_TABLE_NAME); - HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); - table.close(); - return hloc.getPort(); + try (Connection c = ConnectionFactory.createConnection(); + RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) { + return locator.getRegionLocation(Bytes.toBytes("")).getPort(); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java index 1f832016736..5a5005a0734 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java @@ -26,8 +26,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -68,7 +66,7 @@ public class TestRegionRebalancing { private static final byte[] FAMILY_NAME = Bytes.toBytes("col"); public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class); private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private RegionLocator table; + private RegionLocator regionLocator; private HTableDescriptor desc; private String balancerName; @@ -100,59 +98,59 @@ public class TestRegionRebalancing { @SuppressWarnings("deprecation") public void testRebalanceOnRegionServerNumberChange() throws IOException, InterruptedException { - Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); - Admin admin = connection.getAdmin(); - admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, - 1, HBaseTestingUtility.KEYS.length)); - this.table = new HTable(UTIL.getConfiguration(), this.desc.getTableName()); - - MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); - - assertEquals("Test table should have right number of regions", - HBaseTestingUtility.KEYS.length, - this.table.getStartKeys().length); - - // verify that the region assignments are balanced to start out - assertRegionsAreBalanced(); - - // add a region server - total of 2 - LOG.info("Started second server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // On a balanced cluster, calling balance() should return true - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - - // if we add a server, then the balance() call should return true - // add a region server - total of 3 - LOG.info("Started third server=" + + try(Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Admin admin = connection.getAdmin()) { + admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, + 1, HBaseTestingUtility.KEYS.length)); + this.regionLocator = connection.getRegionLocator(this.desc.getTableName()); + + MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); + + assertEquals("Test table should have right number of regions", + HBaseTestingUtility.KEYS.length, + this.regionLocator.getStartKeys().length); + + // verify that the region assignments are balanced to start out + assertRegionsAreBalanced(); + + // add a region server - total of 2 + LOG.info("Started second server=" + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - // kill a region server - total of 2 - LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); - UTIL.getHBaseCluster().waitOnRegionServer(2); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // start two more region servers - total of 4 - LOG.info("Readding third server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - LOG.info("Added fourth server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - for (int i = 0; i < 6; i++){ - LOG.info("Adding " + (i + 5) + "th region server"); - UTIL.getHBaseCluster().startRegionServer(); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // On a balanced cluster, calling balance() should return true + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + + // if we add a server, then the balance() call should return true + // add a region server - total of 3 + LOG.info("Started third server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + // kill a region server - total of 2 + LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); + UTIL.getHBaseCluster().waitOnRegionServer(2); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // start two more region servers - total of 4 + LOG.info("Readding third server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + LOG.info("Added fourth server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + for (int i = 0; i < 6; i++){ + LOG.info("Adding " + (i + 5) + "th region server"); + UTIL.getHBaseCluster().startRegionServer(); + } + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + regionLocator.close(); } - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - table.close(); - admin.close(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index a99b0476b12..e0bf2e03e74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * {@link ClusterConnection} testing utility. @@ -67,6 +69,23 @@ public class HConnectionTestingUtility { } } + /** + * @param connection + */ + private static void mockRegionLocator(final HConnectionImplementation connection) { + try { + Mockito.when(connection.getRegionLocator(Mockito.any(TableName.class))).thenAnswer( + new Answer() { + @Override + public RegionLocator answer(InvocationOnMock invocation) throws Throwable { + TableName tableName = (TableName) invocation.getArguments()[0]; + return new HRegionLocator(tableName, connection); + } + }); + } catch (IOException e) { + } + } + /** * Calls {@link #getMockedConnection(Configuration)} and then mocks a few * more of the popular {@link ClusterConnection} methods so they do 'normal' @@ -107,6 +126,7 @@ public class HConnectionTestingUtility { Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. final HRegionLocation loc = new HRegionLocation(hri, sn); + mockRegionLocator(c); Mockito.when(c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())). thenReturn(loc); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index bc805febfc3..7c2a1eae555 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,24 +26,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -107,6 +89,22 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + /** * Run tests that use the HBase clients; {@link HTable}. * Sets up the HBase mini cluster once at start and runs through all client tests. @@ -5203,40 +5201,41 @@ public class TestFromClientSide { TableName TABLE = TableName.valueOf("testNonCachedGetRegionLocation"); byte [] family1 = Bytes.toBytes("f1"); byte [] family2 = Bytes.toBytes("f2"); - HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); - Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - Map regionsMap = table.getRegionLocations(); - assertEquals(1, regionsMap.size()); - HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); - ServerName addrBefore = regionsMap.get(regionInfo); - // Verify region location before move. - HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - - assertEquals(addrBefore.getPort(), addrCache.getPort()); - assertEquals(addrBefore.getPort(), addrNoCache.getPort()); - - ServerName addrAfter = null; - // Now move the region to a different server. - for (int i = 0; i < SLAVES; i++) { - HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); - ServerName addr = regionServer.getServerName(); - if (addr.getPort() != addrBefore.getPort()) { - admin.move(regionInfo.getEncodedNameAsBytes(), - Bytes.toBytes(addr.toString())); - // Wait for the region to move. - Thread.sleep(5000); - addrAfter = addr; - break; + try (HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); + Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration())) { + Map regionsMap = table.getRegionLocations(); + assertEquals(1, regionsMap.size()); + HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); + ServerName addrBefore = regionsMap.get(regionInfo); + // Verify region location before move. + HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + + assertEquals(addrBefore.getPort(), addrCache.getPort()); + assertEquals(addrBefore.getPort(), addrNoCache.getPort()); + + ServerName addrAfter = null; + // Now move the region to a different server. + for (int i = 0; i < SLAVES; i++) { + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); + ServerName addr = regionServer.getServerName(); + if (addr.getPort() != addrBefore.getPort()) { + admin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(addr.toString())); + // Wait for the region to move. + Thread.sleep(5000); + addrAfter = addr; + break; + } } + + // Verify the region was moved. + addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + assertNotNull(addrAfter); + assertTrue(addrAfter.getPort() != addrCache.getPort()); + assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } - - // Verify the region was moved. - addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - assertNotNull(addrAfter); - assertTrue(addrAfter.getPort() != addrCache.getPort()); - assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } @Test @@ -6241,10 +6240,13 @@ public class TestFromClientSide { HColumnDescriptor fam = new HColumnDescriptor(FAMILY); htd.addFamily(fam); byte[][] KEYS = HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE; - TEST_UTIL.getHBaseAdmin().createTable(htd, KEYS); - List regions = TEST_UTIL.getHBaseAdmin().getTableRegions(htd.getTableName()); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(htd, KEYS); + List regions = admin.getTableRegions(htd.getTableName()); - for (int regionReplication = 1; regionReplication < 4 ; regionReplication++) { + HRegionLocator locator = + (HRegionLocator) admin.getConnection().getRegionLocator(htd.getTableName()); + for (int regionReplication = 1; regionReplication < 4; regionReplication++) { List regionLocations = new ArrayList(); // mock region locations coming from meta with multiple replicas @@ -6256,10 +6258,7 @@ public class TestFromClientSide { regionLocations.add(new RegionLocations(arr)); } - HTable table = spy(new HTable(TEST_UTIL.getConfiguration(), htd.getTableName())); - when(table.listRegionLocations()).thenReturn(regionLocations); - - Pair startEndKeys = table.getStartEndKeys(); + Pair startEndKeys = locator.getStartEndKeys(regionLocations); assertEquals(KEYS.length + 1, startEndKeys.getFirst().length); @@ -6269,9 +6268,6 @@ public class TestFromClientSide { assertArrayEquals(startKey, startEndKeys.getFirst()[i]); assertArrayEquals(endKey, startEndKeys.getSecond()[i]); } - - table.close(); } } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 8ed84649694..afe7e4046c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -335,9 +335,10 @@ public class TestHFileOutputFormat { public void testJobConfiguration() throws Exception { Job job = new Job(util.getConfiguration()); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); - HTable table = Mockito.mock(HTable.class); - setupMockStartKeys(table); - HFileOutputFormat.configureIncrementalLoad(job, table); + HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); + setupMockStartKeys(regionLocator); + HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); assertEquals(job.getNumReduceTasks(), 4); } @@ -467,12 +468,13 @@ public class TestHFileOutputFormat { MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + table.getRegionLocator()); FileOutputFormat.setOutputPath(job, outDir); Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ; - assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); + assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks()); assertTrue(job.waitForCompletion(true)); } @@ -769,14 +771,14 @@ public class TestHFileOutputFormat { return familyToDataBlockEncoding; } - private void setupMockStartKeys(RegionLocator table) throws IOException { + private void setupMockStartKeys(RegionLocator regionLocator) throws IOException { byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; - Mockito.doReturn(mockKeys).when(table).getStartKeys(); + Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys(); } /** @@ -792,6 +794,7 @@ public class TestHFileOutputFormat { // Setup table descriptor HTable table = Mockito.mock(HTable.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) { @@ -799,7 +802,7 @@ public class TestHFileOutputFormat { } // set up the table to return some mock keys - setupMockStartKeys(table); + setupMockStartKeys(regionLocator); try { // partial map red setup to get an operational writer for testing @@ -809,7 +812,7 @@ public class TestHFileOutputFormat { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 2a780d46bd4..02ee16c10a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -25,15 +25,6 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Callable; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -53,6 +44,9 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -86,6 +80,16 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; + /** * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}. * Sets up and runs a mapreduce job that writes hfile output. @@ -131,6 +135,7 @@ public class TestHFileOutputFormat2 { valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); } + @Override protected void map( NullWritable n1, NullWritable n2, Mapper retrievedFamilyToCompressionMap = HFileOutputFormat2 @@ -565,7 +567,7 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(HTable.class); setupMockColumnFamiliesForBloomType(table, familyToBloomType); - HFileOutputFormat2.configureBloomType(table, conf); + HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf); // read back family specific data block encoding settings from the // configuration @@ -636,7 +638,7 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(HTable.class); setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); - HFileOutputFormat2.configureBlockSize(table, conf); + HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf); // read back family specific data block encoding settings from the // configuration @@ -694,10 +696,9 @@ public class TestHFileOutputFormat2 { return familyToBlockSize; } - /** - * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(org.apache.hadoop.hbase.client.Table, - * Configuration)} and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap - * (Configuration)}. + /** + * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)} + * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. * Tests that the compression map is correctly serialized into * and deserialized from configuration * @@ -712,7 +713,8 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(HTable.class); setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); - HFileOutputFormat2.configureDataBlockEncoding(table, conf); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); // read back family specific data block encoding settings from the // configuration @@ -791,7 +793,8 @@ public class TestHFileOutputFormat2 { Path dir = util.getDataTestDir("testColumnFamilySettings"); // Setup table descriptor - HTable table = Mockito.mock(HTable.class); + Table table = Mockito.mock(Table.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { @@ -799,7 +802,7 @@ public class TestHFileOutputFormat2 { } // set up the table to return some mock keys - setupMockStartKeys(table); + setupMockStartKeys(regionLocator); try { // partial map red setup to get an operational writer for testing @@ -809,7 +812,7 @@ public class TestHFileOutputFormat2 { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat2 hof = new HFileOutputFormat2(); @@ -890,10 +893,10 @@ public class TestHFileOutputFormat2 { conf.setInt("hbase.hstore.compaction.min", 2); generateRandomStartKeys(5); - try { - util.startMiniCluster(); + util.startMiniCluster(); + try (Connection conn = ConnectionFactory.createConnection(); + Admin admin = conn.getAdmin()) { final FileSystem fs = util.getDFSCluster().getFileSystem(); - HBaseAdmin admin = new HBaseAdmin(conf); HTable table = util.createTable(TABLE_NAME, FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); @@ -911,7 +914,8 @@ public class TestHFileOutputFormat2 { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); - runIncrementalPELoad(conf, table, testDir); + runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME), + testDir); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); } @@ -925,9 +929,10 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME.getName()); + admin.compact(TABLE_NAME); try { quickPoll(new Callable() { + @Override public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; } @@ -938,8 +943,9 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME.getName()); + admin.majorCompact(TABLE_NAME); quickPoll(new Callable() { + @Override public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; } @@ -957,12 +963,12 @@ public class TestHFileOutputFormat2 { conf.setInt("hbase.hstore.compaction.min", 2); generateRandomStartKeys(5); - try { - util.startMiniCluster(); + util.startMiniCluster(); + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()){ Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); final FileSystem fs = util.getDFSCluster().getFileSystem(); - HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); + Table table = util.createTable(TABLE_NAME, FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir @@ -976,9 +982,10 @@ public class TestHFileOutputFormat2 { Put p = new Put(Bytes.toBytes("test")); p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); table.put(p); - admin.flush(TABLE_NAME.getName()); + admin.flush(TABLE_NAME); assertEquals(1, util.countRows(table)); quickPoll(new Callable() { + @Override public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; } @@ -988,10 +995,12 @@ public class TestHFileOutputFormat2 { conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); util.startMiniMapReduceCluster(); - runIncrementalPELoad(conf, table, testDir); + + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); + runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir); // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); // Ensure data shows up int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; @@ -1002,9 +1011,10 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME.getName()); + admin.compact(TABLE_NAME); try { quickPoll(new Callable() { + @Override public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; } @@ -1015,8 +1025,9 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME.getName()); + admin.majorCompact(TABLE_NAME); quickPoll(new Callable() { + @Override public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; } @@ -1048,18 +1059,22 @@ public class TestHFileOutputFormat2 { Configuration conf = HBaseConfiguration.create(); util = new HBaseTestingUtility(conf); if ("newtable".equals(args[0])) { - byte[] tname = args[1].getBytes(); - HTable table = util.createTable(tname, FAMILIES); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.disableTable(tname); - byte[][] startKeys = generateRandomStartKeys(5); - util.createMultiRegions(conf, table, FAMILIES[0], startKeys); - admin.enableTable(tname); + TableName tname = TableName.valueOf(args[1]); + try (HTable table = util.createTable(tname, FAMILIES); + Admin admin = table.getConnection().getAdmin()) { + admin.disableTable(tname); + byte[][] startKeys = generateRandomStartKeys(5); + util.createMultiRegions(conf, table, FAMILIES[0], startKeys); + admin.enableTable(tname); + } } else if ("incremental".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); - HTable table = new HTable(conf, tname); - Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, table, outDir); + try(Connection c = ConnectionFactory.createConnection(conf); + Admin admin = c.getAdmin(); + RegionLocator regionLocator = c.getRegionLocator(tname)) { + Path outDir = new Path("incremental-out"); + runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir); + } } else { throw new RuntimeException( "usage: TestHFileOutputFormat2 newtable | incremental"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index e7ee0abc419..c4ac82778e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -274,7 +274,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) { setupTable(connection, table, 10); LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { - protected List tryAtomicRegionLoad(final HConnection conn, + @Override + protected List tryAtomicRegionLoad(final Connection conn, TableName tableName, final byte[] first, Collection lqis) throws IOException { int i = attmptedCalls.incrementAndGet(); @@ -348,7 +349,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { // files to fail when attempt to atomically import. This is recoverable. final AtomicInteger attemptedCalls = new AtomicInteger(); LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { - protected void bulkLoadPhase(final Table htable, final HConnection conn, + @Override + protected void bulkLoadPhase(final Table htable, final Connection conn, ExecutorService pool, Deque queue, final Multimap regionGroups) throws IOException { int i = attemptedCalls.incrementAndGet(); @@ -390,9 +392,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { final AtomicInteger countedLqis= new AtomicInteger(); LoadIncrementalHFiles lih = new LoadIncrementalHFiles( util.getConfiguration()) { + @Override protected List groupOrSplit( Multimap regionGroups, - final LoadQueueItem item, final HTable htable, + final LoadQueueItem item, final Table htable, final Pair startEndKeys) throws IOException { List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); if (lqis != null) { @@ -426,9 +429,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { util.getConfiguration()) { int i = 0; + @Override protected List groupOrSplit( Multimap regionGroups, - final LoadQueueItem item, final HTable table, + final LoadQueueItem item, final Table table, final Pair startEndKeys) throws IOException { i++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java index d8f6b2480e0..3c6bb8f0b97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java @@ -75,10 +75,11 @@ public class TestMultiTableInputFormat { TEST_UTIL.startMiniCluster(3); // create and fill table for (int i = 0; i < 3; i++) { - HTable table = - TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), INPUT_FAMILY); - TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, INPUT_FAMILY, 4); - TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + try (HTable table = + TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), INPUT_FAMILY)) { + TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, INPUT_FAMILY, 4); + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + } } // start MR cluster TEST_UTIL.startMiniMapReduceCluster(); @@ -138,6 +139,7 @@ public class TestMultiTableInputFormat { private String first = null; private String last = null; + @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { @@ -153,6 +155,7 @@ public class TestMultiTableInputFormat { assertEquals(3, count); } + @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration c = context.getConfiguration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java index 1f0ab99721d..086cb804082 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java @@ -307,91 +307,93 @@ public class TestServerCustomProtocol { @Test public void testSingleMethod() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); - Map results = table.coprocessorService(PingProtos.PingService.class, - null, ROW_A, - new Batch.Call() { - @Override - public String call(PingProtos.PingService instance) throws IOException { - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); - return rpcCallback.get().getPong(); - } - }); - // Should have gotten results for 1 of the three regions only since we specified - // rows from 1 region - assertEquals(1, results.size()); - verifyRegionResults(table, results, ROW_A); - - final String name = "NAME"; - results = hello(table, name, null, ROW_A); - // Should have gotten results for 1 of the three regions only since we specified - // rows from 1 region - assertEquals(1, results.size()); - verifyRegionResults(table, results, "Hello, NAME", ROW_A); - table.close(); + try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + RegionLocator locator = table.getRegionLocator(); + Map results = table.coprocessorService(PingProtos.PingService.class, + null, ROW_A, + new Batch.Call() { + @Override + public String call(PingProtos.PingService instance) throws IOException { + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); + return rpcCallback.get().getPong(); + } + }); + // Should have gotten results for 1 of the three regions only since we specified + // rows from 1 region + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_A); + + final String name = "NAME"; + results = hello(table, name, null, ROW_A); + // Should have gotten results for 1 of the three regions only since we specified + // rows from 1 region + assertEquals(1, results.size()); + verifyRegionResults(locator, results, "Hello, NAME", ROW_A); + } } @Test public void testRowRange() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); - for (Entry e: table.getRegionLocations().entrySet()) { - LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue()); + try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + RegionLocator locator = table.getRegionLocator(); + for (Entry e: table.getRegionLocations().entrySet()) { + LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue()); + } + // Here are what regions looked like on a run: + // + // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d. + // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e. + // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74. + + Map results = ping(table, null, ROW_A); + // Should contain first region only. + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_A); + + // Test start row + empty end + results = ping(table, ROW_BC, null); + assertEquals(2, results.size()); + // should contain last 2 regions + HRegionLocation loc = table.getRegionLocation(ROW_A, true); + assertNull("Should be missing region for row aaa (prior to start row)", + results.get(loc.getRegionInfo().getRegionName())); + verifyRegionResults(locator, results, ROW_B); + verifyRegionResults(locator, results, ROW_C); + + // test empty start + end + results = ping(table, null, ROW_BC); + // should contain the first 2 regions + assertEquals(2, results.size()); + verifyRegionResults(locator, results, ROW_A); + verifyRegionResults(locator, results, ROW_B); + loc = table.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); + + // test explicit start + end + results = ping(table, ROW_AB, ROW_BC); + // should contain first 2 regions + assertEquals(2, results.size()); + verifyRegionResults(locator, results, ROW_A); + verifyRegionResults(locator, results, ROW_B); + loc = table.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); + + // test single region + results = ping(table, ROW_B, ROW_BC); + // should only contain region bbb + assertEquals(1, results.size()); + verifyRegionResults(locator, results, ROW_B); + loc = table.getRegionLocation(ROW_A, true); + assertNull("Should be missing region for row aaa (prior to start)", + results.get(loc.getRegionInfo().getRegionName())); + loc = table.getRegionLocation(ROW_C, true); + assertNull("Should be missing region for row ccc (past stop row)", + results.get(loc.getRegionInfo().getRegionName())); } - // Here are what regions looked like on a run: - // - // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d. - // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e. - // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74. - - Map results = ping(table, null, ROW_A); - // Should contain first region only. - assertEquals(1, results.size()); - verifyRegionResults(table, results, ROW_A); - - // Test start row + empty end - results = ping(table, ROW_BC, null); - assertEquals(2, results.size()); - // should contain last 2 regions - HRegionLocation loc = table.getRegionLocation(ROW_A, true); - assertNull("Should be missing region for row aaa (prior to start row)", - results.get(loc.getRegionInfo().getRegionName())); - verifyRegionResults(table, results, ROW_B); - verifyRegionResults(table, results, ROW_C); - - // test empty start + end - results = ping(table, null, ROW_BC); - // should contain the first 2 regions - assertEquals(2, results.size()); - verifyRegionResults(table, results, ROW_A); - verifyRegionResults(table, results, ROW_B); - loc = table.getRegionLocation(ROW_C, true); - assertNull("Should be missing region for row ccc (past stop row)", - results.get(loc.getRegionInfo().getRegionName())); - - // test explicit start + end - results = ping(table, ROW_AB, ROW_BC); - // should contain first 2 regions - assertEquals(2, results.size()); - verifyRegionResults(table, results, ROW_A); - verifyRegionResults(table, results, ROW_B); - loc = table.getRegionLocation(ROW_C, true); - assertNull("Should be missing region for row ccc (past stop row)", - results.get(loc.getRegionInfo().getRegionName())); - - // test single region - results = ping(table, ROW_B, ROW_BC); - // should only contain region bbb - assertEquals(1, results.size()); - verifyRegionResults(table, results, ROW_B); - loc = table.getRegionLocation(ROW_A, true); - assertNull("Should be missing region for row aaa (prior to start)", - results.get(loc.getRegionInfo().getRegionName())); - loc = table.getRegionLocation(ROW_C, true); - assertNull("Should be missing region for row ccc (past stop row)", - results.get(loc.getRegionInfo().getRegionName())); - table.close(); } private Map ping(final Table table, final byte [] start, final byte [] end) @@ -414,40 +416,46 @@ public class TestServerCustomProtocol { @Test public void testCompoundCall() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); - Map results = compoundOfHelloAndPing(table, ROW_A, ROW_C); - verifyRegionResults(table, results, "Hello, pong", ROW_A); - verifyRegionResults(table, results, "Hello, pong", ROW_B); - verifyRegionResults(table, results, "Hello, pong", ROW_C); - table.close(); + try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + RegionLocator locator = table.getRegionLocator(); + Map results = compoundOfHelloAndPing(table, ROW_A, ROW_C); + verifyRegionResults(locator, results, "Hello, pong", ROW_A); + verifyRegionResults(locator, results, "Hello, pong", ROW_B); + verifyRegionResults(locator, results, "Hello, pong", ROW_C); + } } @Test public void testNullCall() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); - Map results = hello(table, null, ROW_A, ROW_C); - verifyRegionResults(table, results, "Who are you?", ROW_A); - verifyRegionResults(table, results, "Who are you?", ROW_B); - verifyRegionResults(table, results, "Who are you?", ROW_C); + try(HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + RegionLocator locator = table.getRegionLocator(); + Map results = hello(table, null, ROW_A, ROW_C); + verifyRegionResults(locator, results, "Who are you?", ROW_A); + verifyRegionResults(locator, results, "Who are you?", ROW_B); + verifyRegionResults(locator, results, "Who are you?", ROW_C); + } } @Test public void testNullReturn() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); - Map results = hello(table, "nobody", ROW_A, ROW_C); - verifyRegionResults(table, results, null, ROW_A); - verifyRegionResults(table, results, null, ROW_B); - verifyRegionResults(table, results, null, ROW_C); + try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + RegionLocator locator = table.getRegionLocator(); + Map results = hello(table, "nobody", ROW_A, ROW_C); + verifyRegionResults(locator, results, null, ROW_A); + verifyRegionResults(locator, results, null, ROW_B); + verifyRegionResults(locator, results, null, ROW_C); + } } @Test public void testEmptyReturnType() throws Throwable { - Table table = new HTable(util.getConfiguration(), TEST_TABLE); - Map results = noop(table, ROW_A, ROW_C); - assertEquals("Should have results from three regions", 3, results.size()); - // all results should be null - for (Object v : results.values()) { - assertNull(v); + try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { + Map results = noop(table, ROW_A, ROW_C); + assertEquals("Should have results from three regions", 3, results.size()); + // all results should be null + for (Object v : results.values()) { + assertNull(v); + } } } @@ -456,7 +464,7 @@ public class TestServerCustomProtocol { verifyRegionResults(table, results, "pong", row); } - private void verifyRegionResults(RegionLocator table, + private void verifyRegionResults(RegionLocator regionLocator, Map results, String expected, byte[] row) throws Exception { for (Map.Entry e: results.entrySet()) { @@ -464,7 +472,7 @@ public class TestServerCustomProtocol { ", result key=" + Bytes.toString(e.getKey()) + ", value=" + e.getValue()); } - HRegionLocation loc = table.getRegionLocation(row, true); + HRegionLocation loc = regionLocator.getRegionLocation(row, true); byte[] region = loc.getRegionInfo().getRegionName(); assertTrue("Results should contain region " + Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",